package org.apache.paimon.flink.service;

import java.time.Duration;
import java.util.ArrayList;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.utils.InternalTypeInfo;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.system.FileMonitorTable;
import org.apache.paimon.utils.SerializationUtils;

/* loaded from: input_file:org/apache/paimon/flink/service/QueryFileMonitor.class */
public class QueryFileMonitor extends RichSourceFunction<InternalRow> {
    private static final long serialVersionUID = 1;
    private final Table table;
    private final long monitorInterval;
    private transient SourceFunction.SourceContext<InternalRow> ctx;
    private transient StreamTableScan scan;
    private transient TableRead read;
    private volatile boolean isRunning = true;

    /* loaded from: input_file:org/apache/paimon/flink/service/QueryFileMonitor$FileMonitorChannelComputer.class */
    private static class FileMonitorChannelComputer implements ChannelComputer<InternalRow> {
        private int numChannels;

        private FileMonitorChannelComputer() {
        }

        @Override // org.apache.paimon.table.sink.ChannelComputer
        public void setup(int i) {
            this.numChannels = i;
        }

        @Override // org.apache.paimon.table.sink.ChannelComputer
        public int channel(InternalRow internalRow) {
            return ChannelComputer.select(SerializationUtils.deserializeBinaryRow(internalRow.getBinary(1)), internalRow.getInt(2), this.numChannels);
        }
    }

    public QueryFileMonitor(Table table) {
        this.table = table;
        this.monitorInterval = ((Duration) Options.fromMap(table.options()).get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL)).toMillis();
    }

    public void open(Configuration configuration) throws Exception {
        ReadBuilder newReadBuilder = new FileMonitorTable((FileStoreTable) this.table).newReadBuilder();
        this.scan = newReadBuilder.newStreamScan();
        this.read = newReadBuilder.newRead();
    }

    public void run(SourceFunction.SourceContext<InternalRow> sourceContext) throws Exception {
        boolean doScan;
        this.ctx = sourceContext;
        while (this.isRunning) {
            synchronized (sourceContext.getCheckpointLock()) {
                if (!this.isRunning) {
                    return;
                } else {
                    doScan = doScan();
                }
            }
            if (doScan) {
                Thread.sleep(this.monitorInterval);
            }
        }
    }

    private boolean doScan() throws Exception {
        ArrayList arrayList = new ArrayList();
        RecordReader<InternalRow> createReader = this.read.createReader(this.scan.plan());
        arrayList.getClass();
        createReader.forEachRemaining((v1) -> {
            r1.add(v1);
        });
        SourceFunction.SourceContext<InternalRow> sourceContext = this.ctx;
        sourceContext.getClass();
        arrayList.forEach((v1) -> {
            r1.collect(v1);
        });
        return arrayList.isEmpty();
    }

    public void cancel() {
        if (this.ctx == null) {
            this.isRunning = false;
            return;
        }
        synchronized (this.ctx.getCheckpointLock()) {
            this.isRunning = false;
        }
    }

    public static DataStream<InternalRow> build(StreamExecutionEnvironment streamExecutionEnvironment, Table table) {
        return streamExecutionEnvironment.addSource(new QueryFileMonitor(table), "FileMonitor-" + table.name(), InternalTypeInfo.fromRowType(FileMonitorTable.getRowType()));
    }

    public static ChannelComputer<InternalRow> createChannelComputer() {
        return new FileMonitorChannelComputer();
    }
}
