/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utils.source;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.adapter.DataStreamScanProviderAdapter;
import org.apache.hudi.util.JsonDeserializationFunction;
import org.apache.hudi.utils.factory.ContinuousFileSourceFactory;

public class ContinuousFileSource
implements ScanTableSource {
    private final ResolvedSchema tableSchema;
    private final Path path;
    private final Configuration conf;

    public ContinuousFileSource(ResolvedSchema tableSchema, Path path, Configuration conf) {
        this.tableSchema = tableSchema;
        this.path = path;
        this.conf = conf;
    }

    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext scanContext) {
        return new DataStreamScanProviderAdapter(){

            public boolean isBounded() {
                return false;
            }

            public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
                RowType rowType = (RowType)ContinuousFileSource.this.tableSchema.toSourceRowDataType().getLogicalType();
                return execEnv.addSource((SourceFunction)new BoundedSourceFunction(ContinuousFileSource.this.path, ContinuousFileSource.this.conf.getInteger(ContinuousFileSourceFactory.CHECKPOINTS))).name("continuous_file_source").setParallelism(1).map((MapFunction)JsonDeserializationFunction.getInstance((RowType)rowType), (TypeInformation)InternalTypeInfo.of((RowType)rowType));
            }
        };
    }

    public ChangelogMode getChangelogMode() {
        return ChangelogMode.insertOnly();
    }

    public DynamicTableSource copy() {
        return new ContinuousFileSource(this.tableSchema, this.path, this.conf);
    }

    public String asSummaryString() {
        return "ContinuousFileSource";
    }

    public static class BoundedSourceFunction
    implements SourceFunction<String>,
    CheckpointListener {
        private final Path path;
        private List<String> dataBuffer;
        private final int checkpoints;
        private final AtomicInteger currentCP = new AtomicInteger(0);
        private volatile boolean isRunning = true;

        public BoundedSourceFunction(Path path, int checkpoints) {
            this.path = path;
            this.checkpoints = checkpoints;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<String> context) throws Exception {
            if (this.dataBuffer == null) {
                this.loadDataBuffer();
            }
            int oldCP = this.currentCP.get();
            boolean finish = false;
            while (this.isRunning) {
                int batchSize = this.dataBuffer.size() / this.checkpoints;
                int start = batchSize * oldCP;
                Object object = context.getCheckpointLock();
                synchronized (object) {
                    for (int i = start; i < start + batchSize; ++i) {
                        if (i >= this.dataBuffer.size()) {
                            finish = true;
                            break;
                        }
                        context.collect((Object)this.dataBuffer.get(i));
                    }
                }
                ++oldCP;
                while (this.currentCP.get() < oldCP) {
                    object = context.getCheckpointLock();
                    synchronized (object) {
                        context.getCheckpointLock().wait(10L);
                    }
                }
                if (!finish && this.isRunning) continue;
                return;
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        private void loadDataBuffer() {
            try {
                this.dataBuffer = Files.readAllLines(Paths.get(this.path.toUri()));
            }
            catch (IOException e) {
                throw new RuntimeException("Read file " + this.path + " error", e);
            }
        }

        public void notifyCheckpointComplete(long l) {
            this.currentCP.incrementAndGet();
        }
    }
}

