package org.apache.hudi.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.hudi.adapter.AbstractStreamOperatorAdapter;
import org.apache.hudi.adapter.AbstractStreamOperatorFactoryAdapter;
import org.apache.hudi.adapter.MailboxExecutorAdapter;
import org.apache.hudi.adapter.Utils;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/source/StreamReadOperator.class */
public class StreamReadOperator extends AbstractStreamOperatorAdapter<RowData> implements OneInputStreamOperator<MergeOnReadInputSplit, RowData> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamReadOperator.class);
    private static final int MINI_BATCH_SIZE = 2048;
    private final MailboxExecutorAdapter executor;
    private MergeOnReadInputFormat format;
    private transient SourceFunction.SourceContext<RowData> sourceContext;
    private transient ListState<MergeOnReadInputSplit> inputSplitsState;
    private transient Queue<MergeOnReadInputSplit> splits;
    private volatile transient SplitState currentSplitState;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/source/StreamReadOperator$OperatorFactory.class */
    public static class OperatorFactory extends AbstractStreamOperatorFactoryAdapter<RowData> implements OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> {
        private final MergeOnReadInputFormat format;

        private OperatorFactory(MergeOnReadInputFormat mergeOnReadInputFormat) {
            this.format = mergeOnReadInputFormat;
        }

        public <O extends StreamOperator<RowData>> O createStreamOperator(StreamOperatorParameters<RowData> streamOperatorParameters) {
            StreamReadOperator streamReadOperator = new StreamReadOperator(this.format, this.processingTimeService, getMailboxExecutorAdapter());
            streamReadOperator.setup(streamOperatorParameters.getContainingTask(), streamOperatorParameters.getStreamConfig(), streamOperatorParameters.getOutput());
            return streamReadOperator;
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return StreamReadOperator.class;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/source/StreamReadOperator$SplitState.class */
    public enum SplitState {
        IDLE,
        RUNNING
    }

    private StreamReadOperator(MergeOnReadInputFormat mergeOnReadInputFormat, ProcessingTimeService processingTimeService, MailboxExecutorAdapter mailboxExecutorAdapter) {
        this.format = (MergeOnReadInputFormat) Preconditions.checkNotNull(mergeOnReadInputFormat, "The InputFormat should not be null.");
        this.processingTimeService = processingTimeService;
        this.executor = (MailboxExecutorAdapter) Preconditions.checkNotNull(mailboxExecutorAdapter, "The mailboxExecutor should not be null.");
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.inputSplitsState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("splits", new JavaSerializer()));
        this.currentSplitState = SplitState.IDLE;
        this.splits = new LinkedBlockingDeque();
        if (stateInitializationContext.isRestored()) {
            LOG.info("Restoring state for operator {} (task ID: {}).", getClass().getSimpleName(), Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
            Iterator it = ((Iterable) this.inputSplitsState.get()).iterator();
            while (it.hasNext()) {
                this.splits.add((MergeOnReadInputSplit) it.next());
            }
        }
        this.sourceContext = Utils.getSourceContext(getOperatorConfig().getTimeCharacteristic(), getProcessingTimeService(), getContainingTask(), this.output, getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval());
        enqueueProcessSplits();
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.inputSplitsState.clear();
        this.inputSplitsState.addAll(new ArrayList(this.splits));
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processElement(StreamRecord<MergeOnReadInputSplit> streamRecord) {
        this.splits.add(streamRecord.getValue());
        enqueueProcessSplits();
    }

    private void enqueueProcessSplits() {
        if (this.currentSplitState != SplitState.IDLE || this.splits.isEmpty()) {
            return;
        }
        this.currentSplitState = SplitState.RUNNING;
        this.executor.execute(this::processSplits, "process input split");
    }

    private void processSplits() throws IOException {
        MergeOnReadInputSplit peek = this.splits.peek();
        if (peek == null) {
            return;
        }
        if (this.format.isClosed()) {
            LOG.info("Processing input split : {}", peek);
            this.format.open(peek);
        }
        try {
            consumeAsMiniBatch(peek);
            enqueueProcessSplits();
        } finally {
            this.currentSplitState = SplitState.IDLE;
        }
    }

    private void consumeAsMiniBatch(MergeOnReadInputSplit mergeOnReadInputSplit) throws IOException {
        for (int i = 0; i < 2048; i++) {
            if (this.format.reachedEnd()) {
                this.format.close();
                this.splits.poll();
                return;
            } else {
                this.sourceContext.collect(this.format.nextRecord((RowData) null));
                mergeOnReadInputSplit.consume();
            }
        }
    }

    public void processWatermark(Watermark watermark) {
    }

    @Override // org.apache.hudi.adapter.AbstractStreamOperatorAdapter
    public void close() throws Exception {
        super.close();
        if (this.format != null) {
            this.format.close();
            this.format.closeInputFormat();
            this.format = null;
        }
        this.sourceContext = null;
    }

    @Override // org.apache.hudi.adapter.AbstractStreamOperatorAdapter
    public void finish() throws Exception {
        super.finish();
        this.output.close();
        if (this.sourceContext != null) {
            this.sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
            this.sourceContext.close();
            this.sourceContext = null;
        }
    }

    public static OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory(MergeOnReadInputFormat mergeOnReadInputFormat) {
        return new OperatorFactory(mergeOnReadInputFormat);
    }
}
