package org.apache.flink.state.api.input;

import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.DefaultKeyedStateStore;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.api.input.operator.StateReaderOperator;
import org.apache.flink.state.api.input.splits.KeyGroupRangeInputSplit;
import org.apache.flink.state.api.runtime.SavepointRuntimeContext;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/state/api/input/KeyedStateInputFormat.class */
public class KeyedStateInputFormat<K, N, OUT> extends RichInputFormat<OUT, KeyGroupRangeInputSplit> {
    private static final long serialVersionUID = 8230460226049597182L;
    private static final Logger LOG = LoggerFactory.getLogger(KeyedStateInputFormat.class);
    private final OperatorState operatorState;

    @Nullable
    private final StateBackend stateBackend;
    private final Configuration configuration;
    private final StateReaderOperator<?, K, N, OUT> operator;
    private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
    private transient CloseableRegistry registry;
    private transient BufferingCollector<OUT> out;
    private transient CloseableIterator<Tuple2<K, N>> keysAndNamespaces;

    public KeyedStateInputFormat(OperatorState operatorState, @Nullable StateBackend stateBackend, Configuration configuration, StateReaderOperator<?, K, N, OUT> stateReaderOperator, ExecutionConfig executionConfig) throws IOException {
        Preconditions.checkNotNull(operatorState, "The operator state cannot be null");
        Preconditions.checkNotNull(configuration, "The configuration cannot be null");
        Preconditions.checkNotNull(stateReaderOperator, "The operator cannot be null");
        Preconditions.checkNotNull(executionConfig, "The executionConfig cannot be null");
        this.operatorState = operatorState;
        this.stateBackend = stateBackend;
        this.configuration = new Configuration(configuration);
        this.operator = stateReaderOperator;
        this.serializedExecutionConfig = new SerializedValue<>(executionConfig);
    }

    public void configure(Configuration configuration) {
    }

    public InputSplitAssigner getInputSplitAssigner(KeyGroupRangeInputSplit[] keyGroupRangeInputSplitArr) {
        return new DefaultInputSplitAssigner(keyGroupRangeInputSplitArr);
    }

    public BaseStatistics getStatistics(BaseStatistics baseStatistics) {
        return baseStatistics;
    }

    /* renamed from: createInputSplits, reason: merged with bridge method [inline-methods] */
    public KeyGroupRangeInputSplit[] m1createInputSplits(int i) throws IOException {
        int maxParallelism = this.operatorState.getMaxParallelism();
        return (KeyGroupRangeInputSplit[]) CollectionUtil.mapWithIndex(sortedKeyGroupRanges(i, maxParallelism), (keyGroupRange, num) -> {
            return createKeyGroupRangeInputSplit(this.operatorState, maxParallelism, keyGroupRange, num);
        }).toArray(i2 -> {
            return new KeyGroupRangeInputSplit[i2];
        });
    }

    public void openInputFormat() {
        this.out = new BufferingCollector<>();
    }

    public void open(KeyGroupRangeInputSplit keyGroupRangeInputSplit) throws IOException {
        this.registry = new CloseableRegistry();
        RuntimeContext runtimeContext = getRuntimeContext();
        try {
            StreamOperatorStateContext build = new StreamOperatorContextBuilder(runtimeContext, this.configuration, this.operatorState, keyGroupRangeInputSplit, this.registry, this.stateBackend, (ExecutionConfig) this.serializedExecutionConfig.deserializeValue(runtimeContext.getUserCodeClassLoader())).withMaxParallelism(keyGroupRangeInputSplit.getNumKeyGroups()).withKey(this.operator, runtimeContext.createSerializer(this.operator.getKeyType())).build(LOG);
            KeyedStateBackend<K> keyedStateBackend = (AbstractKeyedStateBackend) build.keyedStateBackend();
            runtimeContext.getClass();
            SavepointRuntimeContext savepointRuntimeContext = new SavepointRuntimeContext(runtimeContext, new DefaultKeyedStateStore(keyedStateBackend, runtimeContext::createSerializer));
            InternalTimeServiceManager<K> internalTimerServiceManager = build.internalTimerServiceManager();
            try {
                StateReaderOperator<?, K, N, OUT> stateReaderOperator = this.operator;
                runtimeContext.getClass();
                stateReaderOperator.setup(runtimeContext::createSerializer, keyedStateBackend, internalTimerServiceManager, savepointRuntimeContext);
                this.operator.open();
                this.keysAndNamespaces = this.operator.getKeysAndNamespaces(savepointRuntimeContext);
            } catch (Exception e) {
                throw new IOException("Failed to restore timer state", e);
            }
        } catch (ClassNotFoundException e2) {
            throw new RuntimeException("Could not deserialize ExecutionConfig.", e2);
        }
    }

    public void close() throws IOException {
        try {
            IOUtils.closeQuietly(this.keysAndNamespaces);
            IOUtils.closeQuietly(this.operator);
            IOUtils.closeQuietly(this.registry);
        } catch (Exception e) {
            throw new IOException("Failed to close state backend", e);
        }
    }

    public boolean reachedEnd() {
        return (this.out.hasNext() || this.keysAndNamespaces.hasNext()) ? false : true;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public OUT nextRecord(OUT out) throws IOException {
        if (this.out.hasNext()) {
            return this.out.next();
        }
        Tuple2 tuple2 = (Tuple2) this.keysAndNamespaces.next();
        this.operator.setCurrentKey(tuple2.f0);
        try {
            this.operator.processElement(tuple2.f0, tuple2.f1, this.out);
            this.keysAndNamespaces.remove();
            return this.out.next();
        } catch (Exception e) {
            throw new IOException("User defined function KeyedStateReaderFunction#readKey threw an exception", e);
        }
    }

    private static KeyGroupRangeInputSplit createKeyGroupRangeInputSplit(OperatorState operatorState, int i, KeyGroupRange keyGroupRange, Integer num) {
        return new KeyGroupRangeInputSplit(StateAssignmentOperation.getManagedKeyedStateHandles(operatorState, keyGroupRange), StateAssignmentOperation.getRawKeyedStateHandles(operatorState, keyGroupRange), i, num.intValue());
    }

    @Nonnull
    private static List<KeyGroupRange> sortedKeyGroupRanges(int i, int i2) {
        List<KeyGroupRange> createKeyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions(i2, Math.min(i, i2));
        createKeyGroupPartitions.sort(Comparator.comparing((v0) -> {
            return v0.getStartKeyGroup();
        }));
        return createKeyGroupPartitions;
    }
}
