/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.api.input;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.state.api.input.splits.OperatorStateInputSplit;
import org.apache.flink.streaming.api.operators.BackendRestorerProcedure;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;

@Internal
abstract class OperatorStateInputFormat<OT>
extends RichInputFormat<OT, OperatorStateInputSplit> {
    private static final long serialVersionUID = -2286490341042373742L;
    private final OperatorState operatorState;
    private final boolean isUnionType;
    private transient OperatorStateBackend restoredBackend;
    private transient CloseableRegistry registry;
    private transient Iterator<OT> elements;

    OperatorStateInputFormat(OperatorState operatorState, boolean isUnionType) {
        Preconditions.checkNotNull((Object)operatorState, (String)"The operator state cannot be null");
        this.operatorState = operatorState;
        this.isUnionType = isUnionType;
    }

    protected abstract Iterable<OT> getElements(OperatorStateBackend var1) throws Exception;

    public void configure(Configuration parameters) {
    }

    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
        return cachedStatistics;
    }

    public InputSplitAssigner getInputSplitAssigner(OperatorStateInputSplit[] inputSplits) {
        return new DefaultInputSplitAssigner((InputSplit[])inputSplits);
    }

    public OperatorStateInputSplit[] createInputSplits(int minNumSplits) {
        OperatorStateInputSplit[] splits = this.getOperatorStateInputSplits(minNumSplits);
        if (this.isUnionType) {
            return this.subPartitionSingleSplit(minNumSplits, splits);
        }
        return splits;
    }

    private OperatorStateInputSplit[] subPartitionSingleSplit(int minNumSplits, OperatorStateInputSplit[] splits) {
        if (splits.length == 0) {
            return splits;
        }
        return (OperatorStateInputSplit[])CollectionUtil.mapWithIndex((Collection)CollectionUtil.partition((Collection)splits[0].getPrioritizedManagedOperatorState().get(0).asList(), (int)minNumSplits), (state, index) -> new OperatorStateInputSplit((StateObjectCollection<OperatorStateHandle>)new StateObjectCollection(new ArrayList(state)), (int)index)).toArray(OperatorStateInputSplit[]::new);
    }

    private OperatorStateInputSplit[] getOperatorStateInputSplits(int minNumSplits) {
        Map newManagedOperatorStates = StateAssignmentOperation.reDistributePartitionableStates(Collections.singletonList(this.operatorState), (int)minNumSplits, Collections.singletonList(OperatorIDPair.generatedIDOnly((OperatorID)this.operatorState.getOperatorID())), OperatorSubtaskState::getManagedOperatorState, (OperatorStateRepartitioner)RoundRobinOperatorStateRepartitioner.INSTANCE);
        return (OperatorStateInputSplit[])CollectionUtil.mapWithIndex(newManagedOperatorStates.values(), (handles, index) -> new OperatorStateInputSplit((StateObjectCollection<OperatorStateHandle>)new StateObjectCollection((Collection)handles), (int)index)).toArray(OperatorStateInputSplit[]::new);
    }

    public void open(OperatorStateInputSplit split) throws IOException {
        this.registry = new CloseableRegistry();
        BackendRestorerProcedure backendRestorer = new BackendRestorerProcedure(handles -> OperatorStateInputFormat.createOperatorStateBackend(this.getRuntimeContext(), handles, this.registry), this.registry, this.operatorState.getOperatorID().toString());
        try {
            this.restoredBackend = (OperatorStateBackend)backendRestorer.createAndRestore(split.getPrioritizedManagedOperatorState());
        }
        catch (Exception exception) {
            throw new IOException("Failed to restore state backend", exception);
        }
        try {
            this.elements = this.getElements(this.restoredBackend).iterator();
        }
        catch (Exception e) {
            throw new IOException("Failed to read operator state from restored state backend", e);
        }
    }

    public void close() {
        this.registry.unregisterCloseable((Closeable)this.restoredBackend);
        IOUtils.closeQuietly((AutoCloseable)this.restoredBackend);
        IOUtils.closeQuietly((AutoCloseable)this.registry);
    }

    public boolean reachedEnd() {
        return !this.elements.hasNext();
    }

    public OT nextRecord(OT reuse) {
        return this.elements.next();
    }

    private static OperatorStateBackend createOperatorStateBackend(RuntimeContext runtimeContext, Collection<OperatorStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) {
        try {
            return new DefaultOperatorStateBackendBuilder(runtimeContext.getUserCodeClassLoader(), runtimeContext.getExecutionConfig(), false, stateHandles, cancelStreamRegistry).build();
        }
        catch (BackendBuildingException e) {
            throw new RuntimeException(e);
        }
    }
}

