/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.hudi.adapter.AbstractStreamOperatorAdapter;
import org.apache.hudi.adapter.AbstractStreamOperatorFactoryAdapter;
import org.apache.hudi.adapter.MailboxExecutorAdapter;
import org.apache.hudi.adapter.Utils;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamReadOperator
extends AbstractStreamOperatorAdapter<RowData>
implements OneInputStreamOperator<MergeOnReadInputSplit, RowData> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamReadOperator.class);
    private static final int MINI_BATCH_SIZE = 2048;
    private final MailboxExecutorAdapter executor;
    private MergeOnReadInputFormat format;
    private transient SourceFunction.SourceContext<RowData> sourceContext;
    private transient ListState<MergeOnReadInputSplit> inputSplitsState;
    private transient Queue<MergeOnReadInputSplit> splits;
    private volatile transient SplitState currentSplitState;

    private StreamReadOperator(MergeOnReadInputFormat format, ProcessingTimeService timeService, MailboxExecutorAdapter mailboxExecutor) {
        this.format = (MergeOnReadInputFormat)((Object)Preconditions.checkNotNull((Object)((Object)format), (String)"The InputFormat should not be null."));
        this.processingTimeService = timeService;
        this.executor = (MailboxExecutorAdapter)Preconditions.checkNotNull((Object)mailboxExecutor, (String)"The mailboxExecutor should not be null.");
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.inputSplitsState = context.getOperatorStateStore().getListState(new ListStateDescriptor("splits", (TypeSerializer)new JavaSerializer()));
        this.currentSplitState = SplitState.IDLE;
        this.splits = new LinkedBlockingDeque<MergeOnReadInputSplit>();
        if (context.isRestored()) {
            int subtaskIdx = this.getRuntimeContext().getIndexOfThisSubtask();
            LOG.info("Restoring state for operator {} (task ID: {}).", (Object)((Object)((Object)this)).getClass().getSimpleName(), (Object)subtaskIdx);
            for (MergeOnReadInputSplit split : (Iterable)this.inputSplitsState.get()) {
                this.splits.add(split);
            }
        }
        this.sourceContext = Utils.getSourceContext((TimeCharacteristic)this.getOperatorConfig().getTimeCharacteristic(), (ProcessingTimeService)this.getProcessingTimeService(), (StreamTask)this.getContainingTask(), (Output)this.output, (long)this.getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval());
        this.enqueueProcessSplits();
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.inputSplitsState.clear();
        this.inputSplitsState.addAll(new ArrayList<MergeOnReadInputSplit>(this.splits));
    }

    public void processElement(StreamRecord<MergeOnReadInputSplit> element) {
        this.splits.add((MergeOnReadInputSplit)element.getValue());
        this.enqueueProcessSplits();
    }

    private void enqueueProcessSplits() {
        if (this.currentSplitState == SplitState.IDLE && !this.splits.isEmpty()) {
            this.currentSplitState = SplitState.RUNNING;
            this.executor.execute(this::processSplits, "process input split");
        }
    }

    private void processSplits() throws IOException {
        MergeOnReadInputSplit split = this.splits.peek();
        if (split == null) {
            this.currentSplitState = SplitState.IDLE;
            return;
        }
        if (this.format.isClosed()) {
            LOG.info("Processing input split : {}", (Object)split);
            this.format.open(split);
        }
        try {
            this.consumeAsMiniBatch(split);
        }
        finally {
            this.currentSplitState = SplitState.IDLE;
        }
        this.enqueueProcessSplits();
    }

    private void consumeAsMiniBatch(MergeOnReadInputSplit split) throws IOException {
        for (int i = 0; i < 2048; ++i) {
            if (this.format.reachedEnd()) {
                this.format.close();
                this.splits.poll();
                break;
            }
            this.sourceContext.collect((Object)this.format.nextRecord(null));
            split.consume();
        }
    }

    public void processWatermark(Watermark mark) {
    }

    public void close() throws Exception {
        super.close();
        if (this.format != null) {
            this.format.close();
            this.format.closeInputFormat();
            this.format = null;
        }
        this.sourceContext = null;
    }

    public void finish() throws Exception {
        super.finish();
        this.output.close();
        if (this.sourceContext != null) {
            this.sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
            this.sourceContext.close();
            this.sourceContext = null;
        }
    }

    public static OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory(MergeOnReadInputFormat format) {
        return new OperatorFactory(format);
    }

    private static class OperatorFactory
    extends AbstractStreamOperatorFactoryAdapter<RowData>
    implements OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> {
        private final MergeOnReadInputFormat format;

        private OperatorFactory(MergeOnReadInputFormat format) {
            this.format = format;
        }

        public <O extends StreamOperator<RowData>> O createStreamOperator(StreamOperatorParameters<RowData> parameters) {
            StreamReadOperator operator = new StreamReadOperator(this.format, this.processingTimeService, this.getMailboxExecutorAdapter());
            operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
            return (O)((Object)operator);
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return StreamReadOperator.class;
        }
    }

    private static enum SplitState {
        IDLE,
        RUNNING;

    }
}

