package org.apache.flink.state.api;

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.api.output.FileCopyFunction;
import org.apache.flink.state.api.output.MergeOperatorStates;
import org.apache.flink.state.api.output.SavepointOutputFormat;
import org.apache.flink.state.api.output.StatePathExtractor;
import org.apache.flink.state.api.output.operators.GroupReduceOperator;
import org.apache.flink.state.api.runtime.SavepointLoader;
import org.apache.flink.state.api.runtime.StateBootstrapTransformationWithID;
import org.apache.flink.state.api.runtime.metadata.SavepointMetadataV2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/state/api/SavepointWriter.class */
public class SavepointWriter {

    @Nullable
    private final StreamExecutionEnvironment executionEnvironment;
    private final Map<OperatorIdentifier, OperatorIdentifier> uidTransformationMap = new HashMap();
    protected final SavepointMetadataV2 metadata;

    @Nullable
    protected final StateBackend stateBackend;
    private final Configuration configuration;

    /* loaded from: input_file:org/apache/flink/state/api/SavepointWriter$CheckpointMetadataCheckpointMetadataMapFunction.class */
    private static class CheckpointMetadataCheckpointMetadataMapFunction extends RichMapFunction<CheckpointMetadata, CheckpointMetadata> {
        private static final long serialVersionUID = 1;
        private final Map<OperatorIdentifier, OperatorIdentifier> uidTransformationMap;

        public CheckpointMetadataCheckpointMetadataMapFunction(Map<OperatorIdentifier, OperatorIdentifier> map) {
            this.uidTransformationMap = new HashMap(map);
        }

        public CheckpointMetadata map(CheckpointMetadata checkpointMetadata) throws Exception {
            return new CheckpointMetadata(checkpointMetadata.getCheckpointId(), (List) checkpointMetadata.getOperatorStates().stream().map(operatorState -> {
                OperatorIdentifier remove = this.uidTransformationMap.remove(OperatorIdentifier.forUidHash(operatorState.getOperatorID().toHexString()));
                return remove != null ? operatorState.copyWithNewOperatorID(remove.getOperatorId()) : operatorState;
            }).collect(Collectors.toList()), checkpointMetadata.getMasterStates());
        }

        public void close() throws Exception {
            if (!this.uidTransformationMap.isEmpty()) {
                throw new FlinkRuntimeException("Some identifier changes were never applied!" + ((String) this.uidTransformationMap.entrySet().stream().map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.joining("\n\t", "\n\t", ""))));
            }
        }
    }

    @Deprecated
    public static SavepointWriter fromExistingSavepoint(String str) throws IOException {
        return new SavepointWriter(readSavepointMetadata(str), null, null);
    }

    public static SavepointWriter fromExistingSavepoint(StreamExecutionEnvironment streamExecutionEnvironment, String str) throws IOException {
        return new SavepointWriter(readSavepointMetadata(str), null, streamExecutionEnvironment);
    }

    @Deprecated
    public static SavepointWriter fromExistingSavepoint(String str, StateBackend stateBackend) throws IOException {
        return new SavepointWriter(readSavepointMetadata(str), stateBackend, null);
    }

    public static SavepointWriter fromExistingSavepoint(StreamExecutionEnvironment streamExecutionEnvironment, String str, StateBackend stateBackend) throws IOException {
        return new SavepointWriter(readSavepointMetadata(str), stateBackend, streamExecutionEnvironment);
    }

    private static SavepointMetadataV2 readSavepointMetadata(String str) throws IOException {
        CheckpointMetadata loadSavepointMetadata = SavepointLoader.loadSavepointMetadata(str);
        return new SavepointMetadataV2(((Integer) loadSavepointMetadata.getOperatorStates().stream().map((v0) -> {
            return v0.getMaxParallelism();
        }).max(Comparator.naturalOrder()).orElseThrow(() -> {
            return new RuntimeException("Savepoint must contain at least one operator state.");
        })).intValue(), loadSavepointMetadata.getMasterStates(), loadSavepointMetadata.getOperatorStates());
    }

    @Deprecated
    public static SavepointWriter newSavepoint(int i) {
        return new SavepointWriter(createSavepointMetadata(i), null, null);
    }

    public static SavepointWriter newSavepoint(StreamExecutionEnvironment streamExecutionEnvironment, int i) {
        return new SavepointWriter(createSavepointMetadata(i), null, streamExecutionEnvironment);
    }

    @Deprecated
    public static SavepointWriter newSavepoint(StateBackend stateBackend, int i) {
        return new SavepointWriter(createSavepointMetadata(i), stateBackend, null);
    }

    public static SavepointWriter newSavepoint(StreamExecutionEnvironment streamExecutionEnvironment, StateBackend stateBackend, int i) {
        return new SavepointWriter(createSavepointMetadata(i), stateBackend, streamExecutionEnvironment);
    }

    private static SavepointMetadataV2 createSavepointMetadata(int i) {
        Preconditions.checkArgument(i > 0 && i <= 32768, "Maximum parallelism must be between 1 and 32768. Found: " + i);
        return new SavepointMetadataV2(i, Collections.emptyList(), Collections.emptyList());
    }

    private SavepointWriter(SavepointMetadataV2 savepointMetadataV2, @Nullable StateBackend stateBackend, @Nullable StreamExecutionEnvironment streamExecutionEnvironment) {
        Preconditions.checkNotNull(savepointMetadataV2, "The savepoint metadata must not be null");
        this.metadata = savepointMetadataV2;
        this.stateBackend = stateBackend;
        this.configuration = new Configuration();
        this.executionEnvironment = streamExecutionEnvironment;
    }

    @Deprecated
    public SavepointWriter removeOperator(String str) {
        return removeOperator(OperatorIdentifier.forUid(str));
    }

    public SavepointWriter removeOperator(OperatorIdentifier operatorIdentifier) {
        this.metadata.removeOperator(operatorIdentifier);
        return this;
    }

    @Deprecated
    public <T> SavepointWriter withOperator(String str, StateBootstrapTransformation<T> stateBootstrapTransformation) {
        return withOperator(OperatorIdentifier.forUid(str), stateBootstrapTransformation);
    }

    public <T> SavepointWriter withOperator(OperatorIdentifier operatorIdentifier, StateBootstrapTransformation<T> stateBootstrapTransformation) {
        this.metadata.addOperator(operatorIdentifier, stateBootstrapTransformation);
        return this;
    }

    public <T> SavepointWriter withConfiguration(ConfigOption<T> configOption, T t) {
        this.configuration.set(configOption, t);
        return this;
    }

    public SavepointWriter changeOperatorIdentifier(OperatorIdentifier operatorIdentifier, OperatorIdentifier operatorIdentifier2) {
        this.uidTransformationMap.put(operatorIdentifier, operatorIdentifier2);
        return this;
    }

    public final void write(String str) {
        Path path = new Path(str);
        Optional<DataStream<OperatorState>> writeOperatorStates = writeOperatorStates(this.metadata.getNewOperators(), this.configuration, path);
        if (this.executionEnvironment == null && !writeOperatorStates.isPresent()) {
            throw new IllegalStateException("Savepoint must contain at least one operator if no execution environment was provided.");
        }
        List<OperatorState> existingOperators = this.metadata.getExistingOperators();
        if (!writeOperatorStates.isPresent() && existingOperators.isEmpty()) {
            throw new IllegalStateException("Savepoint must contain at least one operator to be created.");
        }
        getFinalOperatorStates(this.executionEnvironment != null ? this.executionEnvironment : writeOperatorStates.get().getExecutionEnvironment(), existingOperators, writeOperatorStates.orElse(null), str).transform("reduce(OperatorState)", TypeInformation.of(CheckpointMetadata.class), new GroupReduceOperator(new MergeOperatorStates(this.metadata.getMasterStates()))).forceNonParallel().map(new CheckpointMetadataCheckpointMetadataMapFunction(this.uidTransformationMap)).setParallelism(1).addSink(new OutputFormatSinkFunction(new SavepointOutputFormat(path))).setParallelism(1).name(str);
    }

    private static DataStream<OperatorState> getFinalOperatorStates(StreamExecutionEnvironment streamExecutionEnvironment, List<OperatorState> list, @Nullable DataStream<OperatorState> dataStream, String str) {
        if (list.isEmpty()) {
            return dataStream;
        }
        DataStream name = streamExecutionEnvironment.fromCollection(list).name("existingOperatorStates");
        name.flatMap(new StatePathExtractor()).setParallelism(1).addSink(new OutputFormatSinkFunction(new FileCopyFunction(str)));
        return dataStream != null ? dataStream.union(new DataStream[]{name}) : name;
    }

    private Optional<DataStream<OperatorState>> writeOperatorStates(List<StateBootstrapTransformationWithID<?>> list, Configuration configuration, Path path) {
        return list.stream().map(stateBootstrapTransformationWithID -> {
            return stateBootstrapTransformationWithID.getBootstrapTransformation().writeOperatorState(stateBootstrapTransformationWithID.getOperatorID(), this.stateBackend, configuration, this.metadata.getMaxParallelism(), path);
        }).reduce((obj, dataStream) -> {
            return ((DataStream) obj).union(new DataStream[]{dataStream});
        });
    }
}
