package org.apache.flink.state.api.input;

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.DefaultKeyedStateStore;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.state.api.input.splits.KeyGroupRangeInputSplit;
import org.apache.flink.state.api.runtime.NeverFireProcessingTimeService;
import org.apache.flink.state.api.runtime.SavepointEnvironment;
import org.apache.flink.state.api.runtime.SavepointRuntimeContext;
import org.apache.flink.state.api.runtime.VoidTriggerable;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.api.operators.TimerSerializer;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/state/api/input/KeyedStateInputFormat.class */
public class KeyedStateInputFormat<K, OUT> extends RichInputFormat<OUT, KeyGroupRangeInputSplit> implements KeyContext {
    private static final long serialVersionUID = 8230460226049597182L;
    private static final String USER_TIMERS_NAME = "user-timers";
    private final OperatorState operatorState;
    private final StateBackend stateBackend;
    private final TypeInformation<K> keyType;
    private final KeyedStateReaderFunction<K, OUT> userFunction;
    private transient TypeSerializer<K> keySerializer;
    private transient CloseableRegistry registry;
    private transient BufferingCollector<OUT> out;
    private transient Iterator<K> keys;
    private transient AbstractKeyedStateBackend<K> keyedStateBackend;
    private transient Context ctx;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/state/api/input/KeyedStateInputFormat$Context.class */
    public static class Context<K> implements KeyedStateReaderFunction.Context {
        private static final String EVENT_TIMER_STATE = "event-time-timers";
        private static final String PROC_TIMER_STATE = "proc-time-timers";
        ListState<Long> eventTimers;
        ListState<Long> procTimers;

        private Context(AbstractKeyedStateBackend<K> abstractKeyedStateBackend, InternalTimerService<VoidNamespace> internalTimerService) throws Exception {
            this.eventTimers = abstractKeyedStateBackend.getPartitionedState(KeyedStateInputFormat.USER_TIMERS_NAME, StringSerializer.INSTANCE, new ListStateDescriptor(EVENT_TIMER_STATE, Types.LONG));
            internalTimerService.forEachEventTimeTimer((voidNamespace, l) -> {
                if (voidNamespace.equals(VoidNamespace.INSTANCE)) {
                    this.eventTimers.add(l);
                }
            });
            this.procTimers = abstractKeyedStateBackend.getPartitionedState(KeyedStateInputFormat.USER_TIMERS_NAME, StringSerializer.INSTANCE, new ListStateDescriptor(PROC_TIMER_STATE, Types.LONG));
            internalTimerService.forEachProcessingTimeTimer((voidNamespace2, l2) -> {
                if (voidNamespace2.equals(VoidNamespace.INSTANCE)) {
                    this.procTimers.add(l2);
                }
            });
        }

        @Override // org.apache.flink.state.api.functions.KeyedStateReaderFunction.Context
        public Set<Long> registeredEventTimeTimers() throws Exception {
            Iterable iterable = (Iterable) this.eventTimers.get();
            return iterable == null ? Collections.emptySet() : (Set) StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toSet());
        }

        @Override // org.apache.flink.state.api.functions.KeyedStateReaderFunction.Context
        public Set<Long> registeredProcessingTimeTimers() throws Exception {
            Iterable iterable = (Iterable) this.procTimers.get();
            return iterable == null ? Collections.emptySet() : (Set) StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toSet());
        }
    }

    public KeyedStateInputFormat(OperatorState operatorState, StateBackend stateBackend, TypeInformation<K> typeInformation, KeyedStateReaderFunction<K, OUT> keyedStateReaderFunction) {
        Preconditions.checkNotNull(operatorState, "The operator state cannot be null");
        Preconditions.checkNotNull(stateBackend, "The state backend cannot be null");
        Preconditions.checkNotNull(typeInformation, "The key type information cannot be null");
        Preconditions.checkNotNull(keyedStateReaderFunction, "The userfunction cannot be null");
        this.operatorState = operatorState;
        this.stateBackend = stateBackend;
        this.keyType = typeInformation;
        this.userFunction = keyedStateReaderFunction;
    }

    public void configure(Configuration configuration) {
    }

    public InputSplitAssigner getInputSplitAssigner(KeyGroupRangeInputSplit[] keyGroupRangeInputSplitArr) {
        return new DefaultInputSplitAssigner(keyGroupRangeInputSplitArr);
    }

    public BaseStatistics getStatistics(BaseStatistics baseStatistics) {
        return baseStatistics;
    }

    /* renamed from: createInputSplits, reason: merged with bridge method [inline-methods] */
    public KeyGroupRangeInputSplit[] m0createInputSplits(int i) throws IOException {
        int maxParallelism = this.operatorState.getMaxParallelism();
        return (KeyGroupRangeInputSplit[]) CollectionUtil.mapWithIndex(sortedKeyGroupRanges(i, maxParallelism), (keyGroupRange, num) -> {
            return createKeyGroupRangeInputSplit(this.operatorState, maxParallelism, keyGroupRange, num);
        }).toArray(i2 -> {
            return new KeyGroupRangeInputSplit[i2];
        });
    }

    public void openInputFormat() {
        this.out = new BufferingCollector<>();
        this.keySerializer = this.keyType.createSerializer(getRuntimeContext().getExecutionConfig());
    }

    public void open(KeyGroupRangeInputSplit keyGroupRangeInputSplit) throws IOException {
        this.registry = new CloseableRegistry();
        StreamOperatorStateContext streamOperatorStateContext = getStreamOperatorStateContext(new SavepointEnvironment.Builder(getRuntimeContext(), keyGroupRangeInputSplit.getNumKeyGroups()).setSubtaskIndex(keyGroupRangeInputSplit.getSplitNumber()).setPrioritizedOperatorSubtaskState(keyGroupRangeInputSplit.getPrioritizedOperatorSubtaskState()).build());
        this.keyedStateBackend = streamOperatorStateContext.keyedStateBackend();
        SavepointRuntimeContext savepointRuntimeContext = new SavepointRuntimeContext(getRuntimeContext(), new DefaultKeyedStateStore(this.keyedStateBackend, getRuntimeContext().getExecutionConfig()));
        FunctionUtils.setFunctionRuntimeContext(this.userFunction, savepointRuntimeContext);
        this.keys = getKeyIterator(savepointRuntimeContext);
        try {
            this.ctx = new Context(this.keyedStateBackend, restoreTimerService(streamOperatorStateContext));
        } catch (Exception e) {
            throw new IOException("Failed to restore timer state", e);
        }
    }

    private InternalTimerService<VoidNamespace> restoreTimerService(StreamOperatorStateContext streamOperatorStateContext) {
        return streamOperatorStateContext.internalTimerServiceManager().getInternalTimerService(USER_TIMERS_NAME, new TimerSerializer(this.keySerializer, VoidNamespaceSerializer.INSTANCE), VoidTriggerable.instance());
    }

    private Iterator<K> getKeyIterator(SavepointRuntimeContext savepointRuntimeContext) throws IOException {
        try {
            FunctionUtils.openFunction(this.userFunction, new Configuration());
            savepointRuntimeContext.disableStateRegistration();
            return new MultiStateKeyIterator(savepointRuntimeContext.getStateDescriptors(), this.keyedStateBackend);
        } catch (Exception e) {
            throw new IOException("Failed to open user defined function", e);
        }
    }

    private StreamOperatorStateContext getStreamOperatorStateContext(Environment environment) throws IOException {
        try {
            return new StreamTaskStateInitializerImpl(environment, this.stateBackend, new NeverFireProcessingTimeService()).streamOperatorStateContext(this.operatorState.getOperatorID(), this.operatorState.getOperatorID().toString(), this, this.keySerializer, this.registry, getRuntimeContext().getMetricGroup());
        } catch (Exception e) {
            throw new IOException("Failed to restore state backend", e);
        }
    }

    public void close() throws IOException {
        this.registry.close();
    }

    public boolean reachedEnd() {
        return (this.out.hasNext() || this.keys.hasNext()) ? false : true;
    }

    public OUT nextRecord(OUT out) throws IOException {
        if (this.out.hasNext()) {
            return this.out.next();
        }
        K next = this.keys.next();
        setCurrentKey(next);
        try {
            this.userFunction.readKey(next, this.ctx, this.out);
            this.keys.remove();
            return this.out.next();
        } catch (Exception e) {
            throw new IOException("User defined function KeyedStateReaderFunction#readKey threw an exception", e);
        }
    }

    public void setCurrentKey(Object obj) {
        this.keyedStateBackend.setCurrentKey(obj);
    }

    public Object getCurrentKey() {
        return this.keyedStateBackend.getCurrentKey();
    }

    private static KeyGroupRangeInputSplit createKeyGroupRangeInputSplit(OperatorState operatorState, int i, KeyGroupRange keyGroupRange, Integer num) {
        return new KeyGroupRangeInputSplit(StateAssignmentOperation.getManagedKeyedStateHandles(operatorState, keyGroupRange), StateAssignmentOperation.getRawKeyedStateHandles(operatorState, keyGroupRange), i, num.intValue());
    }

    @Nonnull
    private static List<KeyGroupRange> sortedKeyGroupRanges(int i, int i2) {
        List<KeyGroupRange> createKeyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions(i2, Math.min(i, i2));
        createKeyGroupPartitions.sort(Comparator.comparing((v0) -> {
            return v0.getStartKeyGroup();
        }));
        return createKeyGroupPartitions;
    }
}
