/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.statefun.flink.state.processor;

import java.io.IOException;
import java.io.Serializable;
import java.util.LinkedList;
import java.util.List;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.state.api.BootstrapTransformation;
import org.apache.flink.state.api.NewSavepoint;
import org.apache.flink.state.api.OperatorTransformation;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.statefun.flink.state.processor.BootstrapDataRouterProvider;
import org.apache.flink.statefun.flink.state.processor.StateBootstrapFunctionProvider;
import org.apache.flink.statefun.flink.state.processor.operator.FunctionsStateBootstrapOperator;
import org.apache.flink.statefun.flink.state.processor.operator.StateBootstrapFunctionRegistry;
import org.apache.flink.statefun.flink.state.processor.union.BootstrapDataset;
import org.apache.flink.statefun.flink.state.processor.union.BootstrapDatasetUnion;
import org.apache.flink.statefun.flink.state.processor.union.TaggedBootstrapData;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.util.Preconditions;

public class StatefulFunctionsSavepointCreator {
    private final int maxParallelism;
    private StateBackend stateBackend;
    private final StateBootstrapFunctionRegistry stateBootstrapFunctionRegistry = new StateBootstrapFunctionRegistry();
    private final List<BootstrapDataset<?>> bootstrapDatasets = new LinkedList();

    public StatefulFunctionsSavepointCreator(int maxParallelism) {
        Preconditions.checkArgument((maxParallelism > 0 ? 1 : 0) != 0);
        this.maxParallelism = maxParallelism;
        try {
            this.stateBackend = new RocksDBStateBackend("file:///tmp/ignored");
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public StatefulFunctionsSavepointCreator withFsStateBackend() {
        this.stateBackend = new FsStateBackend("file:///tmp/ignored");
        return this;
    }

    public <IN> StatefulFunctionsSavepointCreator withBootstrapData(DataSet<IN> bootstrapDataset, BootstrapDataRouterProvider<IN> routerProvider) {
        this.bootstrapDatasets.add(new BootstrapDataset<IN>(bootstrapDataset, routerProvider));
        return this;
    }

    public StatefulFunctionsSavepointCreator withStateBootstrapFunctionProvider(FunctionType functionType, StateBootstrapFunctionProvider bootstrapFunctionProvider) {
        this.stateBootstrapFunctionRegistry.register(functionType, bootstrapFunctionProvider);
        return this;
    }

    public void write(String path) {
        Preconditions.checkState((this.bootstrapDatasets.size() > 0 ? 1 : 0) != 0, (Object)"At least 1 bootstrap DataSet must be registered.");
        Preconditions.checkState((this.stateBootstrapFunctionRegistry.numRegistrations() > 0 ? 1 : 0) != 0, (Object)"At least 1 StateBootstrapFunctionProvider must be registered.");
        NewSavepoint newSavepoint = Savepoint.create((StateBackend)this.stateBackend, (int)this.maxParallelism);
        DataSet<TaggedBootstrapData> taggedUnionBootstrapDataset = BootstrapDatasetUnion.apply(this.bootstrapDatasets);
        BootstrapTransformation bootstrapTransformation = OperatorTransformation.bootstrapWith(taggedUnionBootstrapDataset).keyBy((KeySelector & Serializable)data -> data.getTarget().id()).transform((timestamp, savepointPath) -> new FunctionsStateBootstrapOperator(this.stateBootstrapFunctionRegistry, timestamp, savepointPath));
        newSavepoint.withOperator("functions_uid1", bootstrapTransformation);
        newSavepoint.write(path);
    }
}

