package org.apache.flink.table.store.connector.source;

import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.log.LogSourceProvider;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
import org.apache.flink.table.store.utils.Projection;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

/* loaded from: input_file:org/apache/flink/table/store/connector/source/FlinkSourceBuilder.class */
public class FlinkSourceBuilder {
    private final ObjectIdentifier tableIdentifier;
    private final FileStoreTable table;
    private final Configuration conf;
    private boolean isContinuous = false;
    private StreamExecutionEnvironment env;

    @Nullable
    private int[][] projectedFields;

    @Nullable
    private Predicate predicate;

    @Nullable
    private LogSourceProvider logSourceProvider;

    @Nullable
    private Integer parallelism;

    @Nullable
    private Long limit;

    @Nullable
    private WatermarkStrategy<RowData> watermarkStrategy;

    public FlinkSourceBuilder(ObjectIdentifier objectIdentifier, FileStoreTable fileStoreTable) {
        this.tableIdentifier = objectIdentifier;
        this.table = fileStoreTable;
        this.conf = Configuration.fromMap(fileStoreTable.schema().options());
    }

    public FlinkSourceBuilder withContinuousMode(boolean z) {
        this.isContinuous = z;
        return this;
    }

    public FlinkSourceBuilder withEnv(StreamExecutionEnvironment streamExecutionEnvironment) {
        this.env = streamExecutionEnvironment;
        return this;
    }

    public FlinkSourceBuilder withProjection(int[][] iArr) {
        this.projectedFields = iArr;
        return this;
    }

    public FlinkSourceBuilder withPredicate(Predicate predicate) {
        this.predicate = predicate;
        return this;
    }

    public FlinkSourceBuilder withLimit(@Nullable Long l) {
        this.limit = l;
        return this;
    }

    public FlinkSourceBuilder withLogSourceProvider(LogSourceProvider logSourceProvider) {
        this.logSourceProvider = logSourceProvider;
        return this;
    }

    public FlinkSourceBuilder withParallelism(@Nullable Integer num) {
        this.parallelism = num;
        return this;
    }

    public FlinkSourceBuilder withWatermarkStrategy(@Nullable WatermarkStrategy<RowData> watermarkStrategy) {
        this.watermarkStrategy = watermarkStrategy;
        return this;
    }

    private StaticFileStoreSource buildStaticFileSource() {
        return new StaticFileStoreSource(this.table, this.projectedFields, this.predicate, this.limit);
    }

    private ContinuousFileStoreSource buildContinuousFileSource() {
        return new ContinuousFileStoreSource(this.table, this.projectedFields, this.predicate, this.limit);
    }

    private Source<RowData, ?, ?> buildSource() {
        if (!this.isContinuous) {
            return buildStaticFileSource();
        }
        ContinuousDataFileSnapshotEnumerator.validate(this.table.schema());
        return this.logSourceProvider == null ? buildContinuousFileSource() : CoreOptions.startupMode(this.conf) != CoreOptions.StartupMode.LATEST_FULL ? this.logSourceProvider.createSource((Map) null) : HybridSource.builder(buildStaticFileSource()).addSource(new LogHybridSourceFactory(this.logSourceProvider), Boundedness.CONTINUOUS_UNBOUNDED).build();
    }

    public DataStreamSource<RowData> build() {
        if (this.env == null) {
            throw new IllegalArgumentException("StreamExecutionEnvironment should not be null.");
        }
        RowType logicalRowType = this.table.schema().logicalRowType();
        DataStreamSource<RowData> fromSource = this.env.fromSource(buildSource(), this.watermarkStrategy == null ? WatermarkStrategy.noWatermarks() : this.watermarkStrategy, this.tableIdentifier.asSummaryString(), InternalTypeInfo.of((LogicalType) Optional.ofNullable(this.projectedFields).map(Projection::of).map(projection -> {
            return projection.project(logicalRowType);
        }).orElse(logicalRowType)));
        if (this.parallelism != null) {
            fromSource.setParallelism(this.parallelism.intValue());
        }
        return fromSource;
    }
}
