/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.api.input;

import java.io.IOException;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.DefaultKeyedStateStore;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.api.input.BufferingCollector;
import org.apache.flink.state.api.input.operator.StateReaderOperator;
import org.apache.flink.state.api.input.splits.KeyGroupRangeInputSplit;
import org.apache.flink.state.api.runtime.NeverFireProcessingTimeService;
import org.apache.flink.state.api.runtime.SavepointEnvironment;
import org.apache.flink.state.api.runtime.SavepointRuntimeContext;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;

@Internal
public class KeyedStateInputFormat<K, N, OUT>
extends RichInputFormat<OUT, KeyGroupRangeInputSplit> {
    private static final long serialVersionUID = 8230460226049597182L;
    private final OperatorState operatorState;
    private final StateBackend stateBackend;
    private final Configuration configuration;
    private final StateReaderOperator<?, K, N, OUT> operator;
    private transient CloseableRegistry registry;
    private transient BufferingCollector<OUT> out;
    private transient Iterator<Tuple2<K, N>> keysAndNamespaces;

    public KeyedStateInputFormat(OperatorState operatorState, StateBackend stateBackend, Configuration configuration, StateReaderOperator<?, K, N, OUT> operator) {
        Preconditions.checkNotNull((Object)operatorState, (String)"The operator state cannot be null");
        Preconditions.checkNotNull((Object)stateBackend, (String)"The state backend cannot be null");
        Preconditions.checkNotNull((Object)configuration, (String)"The configuration cannot be null");
        Preconditions.checkNotNull(operator, (String)"The operator cannot be null");
        this.operatorState = operatorState;
        this.stateBackend = stateBackend;
        this.configuration = new Configuration(configuration);
        this.operator = operator;
    }

    public void configure(Configuration parameters) {
    }

    public InputSplitAssigner getInputSplitAssigner(KeyGroupRangeInputSplit[] inputSplits) {
        return new DefaultInputSplitAssigner((InputSplit[])inputSplits);
    }

    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
        return cachedStatistics;
    }

    public KeyGroupRangeInputSplit[] createInputSplits(int minNumSplits) throws IOException {
        int maxParallelism = this.operatorState.getMaxParallelism();
        List<KeyGroupRange> keyGroups = KeyedStateInputFormat.sortedKeyGroupRanges(minNumSplits, maxParallelism);
        return (KeyGroupRangeInputSplit[])CollectionUtil.mapWithIndex(keyGroups, (keyGroupRange, index) -> KeyedStateInputFormat.createKeyGroupRangeInputSplit(this.operatorState, maxParallelism, keyGroupRange, index)).toArray(KeyGroupRangeInputSplit[]::new);
    }

    public void openInputFormat() {
        this.out = new BufferingCollector();
    }

    public void open(KeyGroupRangeInputSplit split) throws IOException {
        this.registry = new CloseableRegistry();
        SavepointEnvironment environment = new SavepointEnvironment.Builder(this.getRuntimeContext(), split.getNumKeyGroups()).setConfiguration(this.configuration).setSubtaskIndex(split.getSplitNumber()).setPrioritizedOperatorSubtaskState(split.getPrioritizedOperatorSubtaskState()).build();
        StreamOperatorStateContext context = this.getStreamOperatorStateContext(environment);
        AbstractKeyedStateBackend keyedStateBackend = context.keyedStateBackend();
        DefaultKeyedStateStore keyedStateStore = new DefaultKeyedStateStore((KeyedStateBackend)keyedStateBackend, this.getRuntimeContext().getExecutionConfig());
        SavepointRuntimeContext ctx = new SavepointRuntimeContext(this.getRuntimeContext(), (KeyedStateStore)keyedStateStore);
        InternalTimeServiceManager timeServiceManager = context.internalTimerServiceManager();
        try {
            this.operator.setup(this.getRuntimeContext().getExecutionConfig(), (KeyedStateBackend<K>)keyedStateBackend, (InternalTimeServiceManager<K>)timeServiceManager, ctx);
            this.operator.open();
            this.keysAndNamespaces = this.operator.getKeysAndNamespaces(ctx);
        }
        catch (Exception e) {
            throw new IOException("Failed to restore timer state", e);
        }
    }

    private StreamOperatorStateContext getStreamOperatorStateContext(Environment environment) throws IOException {
        StreamTaskStateInitializerImpl initializer = new StreamTaskStateInitializerImpl(environment, this.stateBackend);
        try {
            return initializer.streamOperatorStateContext(this.operatorState.getOperatorID(), this.operatorState.getOperatorID().toString(), (ProcessingTimeService)new NeverFireProcessingTimeService(), this.operator, this.operator.getKeyType().createSerializer(environment.getExecutionConfig()), this.registry, this.getRuntimeContext().getMetricGroup());
        }
        catch (Exception e) {
            throw new IOException("Failed to restore state backend", e);
        }
    }

    public void close() throws IOException {
        try {
            this.operator.close();
            this.registry.close();
        }
        catch (Exception e) {
            throw new IOException("Failed to close state backend", e);
        }
    }

    public boolean reachedEnd() {
        return !this.out.hasNext() && !this.keysAndNamespaces.hasNext();
    }

    public OUT nextRecord(OUT reuse) throws IOException {
        if (this.out.hasNext()) {
            return this.out.next();
        }
        Tuple2<K, N> keyAndNamespace = this.keysAndNamespaces.next();
        this.operator.setCurrentKey(keyAndNamespace.f0);
        try {
            this.operator.processElement(keyAndNamespace.f0, keyAndNamespace.f1, this.out);
        }
        catch (Exception e) {
            throw new IOException("User defined function KeyedStateReaderFunction#readKey threw an exception", e);
        }
        this.keysAndNamespaces.remove();
        return this.out.next();
    }

    private static KeyGroupRangeInputSplit createKeyGroupRangeInputSplit(OperatorState operatorState, int maxParallelism, KeyGroupRange keyGroupRange, Integer index) {
        List managedKeyedState = StateAssignmentOperation.getManagedKeyedStateHandles((OperatorState)operatorState, (KeyGroupRange)keyGroupRange);
        List rawKeyedState = StateAssignmentOperation.getRawKeyedStateHandles((OperatorState)operatorState, (KeyGroupRange)keyGroupRange);
        return new KeyGroupRangeInputSplit(managedKeyedState, rawKeyedState, maxParallelism, index);
    }

    @Nonnull
    private static List<KeyGroupRange> sortedKeyGroupRanges(int minNumSplits, int maxParallelism) {
        List keyGroups = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism, (int)Math.min(minNumSplits, maxParallelism));
        keyGroups.sort(Comparator.comparing(KeyGroupRange::getStartKeyGroup));
        return keyGroups;
    }
}

