package org.apache.flink.table.store.connector;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogLock;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.store.connector.sink.BucketStreamPartitioner;
import org.apache.flink.table.store.connector.sink.StoreSink;
import org.apache.flink.table.store.connector.sink.global.GlobalCommittingSinkTranslator;
import org.apache.flink.table.store.connector.source.FileStoreSource;
import org.apache.flink.table.store.connector.source.LogHybridSourceFactory;
import org.apache.flink.table.store.file.FileStore;
import org.apache.flink.table.store.file.FileStoreImpl;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.log.LogOptions;
import org.apache.flink.table.store.log.LogSinkProvider;
import org.apache.flink.table.store.log.LogSourceProvider;
import org.apache.flink.table.store.utils.TypeUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@Experimental
/* loaded from: input_file:org/apache/flink/table/store/connector/TableStore.class */
public class TableStore {
    private final Configuration options;
    private String user = UUID.randomUUID().toString();
    private int[] partitions = new int[0];
    private int[] primaryKeys = new int[0];
    private int[] logPrimaryKeys = new int[0];
    private RowType type;
    private ObjectIdentifier tableIdentifier;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.table.store.connector.TableStore$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/table/store/connector/TableStore$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$table$store$file$FileStoreOptions$MergeEngine = new int[FileStoreOptions.MergeEngine.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$table$store$file$FileStoreOptions$MergeEngine[FileStoreOptions.MergeEngine.DEDUPLICATE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$table$store$file$FileStoreOptions$MergeEngine[FileStoreOptions.MergeEngine.PARTIAL_UPDATE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/table/store/connector/TableStore$SinkBuilder.class */
    public class SinkBuilder {
        private DataStream<RowData> input;

        @Nullable
        private CatalogLock.Factory lockFactory;

        @Nullable
        private Map<String, String> overwritePartition;

        @Nullable
        private LogSinkProvider logSinkProvider;

        @Nullable
        private Integer parallelism;

        public SinkBuilder() {
        }

        public SinkBuilder withInput(DataStream<RowData> dataStream) {
            this.input = dataStream;
            return this;
        }

        public SinkBuilder withLockFactory(CatalogLock.Factory factory) {
            this.lockFactory = factory;
            return this;
        }

        public SinkBuilder withOverwritePartition(Map<String, String> map) {
            this.overwritePartition = map;
            return this;
        }

        public SinkBuilder withLogSinkProvider(LogSinkProvider logSinkProvider) {
            this.logSinkProvider = logSinkProvider;
            return this;
        }

        public SinkBuilder withParallelism(@Nullable Integer num) {
            this.parallelism = num;
            return this;
        }

        public DataStreamSink<?> build() {
            FileStore buildFileStore = TableStore.this.buildFileStore();
            int intValue = ((Integer) TableStore.this.options.get(FileStoreOptions.BUCKET)).intValue();
            PartitionTransformation partitionTransformation = new PartitionTransformation(this.input.getTransformation(), new BucketStreamPartitioner(intValue, TableStore.this.type, TableStore.this.partitions, TableStore.this.primaryKeys, TableStore.this.logPrimaryKeys));
            if (this.parallelism != null) {
                partitionTransformation.setParallelism(this.parallelism.intValue());
            }
            return GlobalCommittingSinkTranslator.translate(new DataStream(this.input.getExecutionEnvironment(), partitionTransformation), new StoreSink(TableStore.this.tableIdentifier, buildFileStore, TableStore.this.partitions, TableStore.this.primaryKeys, TableStore.this.logPrimaryKeys, intValue, this.lockFactory, this.overwritePartition, this.logSinkProvider));
        }
    }

    /* loaded from: input_file:org/apache/flink/table/store/connector/TableStore$SourceBuilder.class */
    public class SourceBuilder {
        private boolean isContinuous = false;
        private StreamExecutionEnvironment env;

        @Nullable
        private int[][] projectedFields;

        @Nullable
        private Predicate partitionPredicate;

        @Nullable
        private Predicate fieldPredicate;

        @Nullable
        private LogSourceProvider logSourceProvider;

        @Nullable
        private Integer parallelism;

        public SourceBuilder() {
        }

        public SourceBuilder withEnv(StreamExecutionEnvironment streamExecutionEnvironment) {
            this.env = streamExecutionEnvironment;
            return this;
        }

        public SourceBuilder withProjection(int[][] iArr) {
            this.projectedFields = iArr;
            return this;
        }

        public SourceBuilder withPartitionPredicate(Predicate predicate) {
            this.partitionPredicate = predicate;
            return this;
        }

        public SourceBuilder withFieldPredicate(Predicate predicate) {
            this.fieldPredicate = predicate;
            return this;
        }

        public SourceBuilder withContinuousMode(boolean z) {
            this.isContinuous = z;
            return this;
        }

        public SourceBuilder withLogSourceProvider(LogSourceProvider logSourceProvider) {
            this.logSourceProvider = logSourceProvider;
            return this;
        }

        public SourceBuilder withParallelism(@Nullable Integer num) {
            this.parallelism = num;
            return this;
        }

        private long discoveryIntervalMills() {
            return ((Duration) TableStore.this.options.get(FileStoreOptions.CONTINUOUS_DISCOVERY_INTERVAL)).toMillis();
        }

        private FileStoreSource buildFileSource(boolean z, boolean z2) {
            return new FileStoreSource(TableStore.this.buildFileStore(), TableStore.this.primaryKeys.length == 0, z, discoveryIntervalMills(), z2, this.projectedFields, this.partitionPredicate, this.fieldPredicate);
        }

        private Source<RowData, ?, ?> buildSource() {
            if (!this.isContinuous) {
                return buildFileSource(false, false);
            }
            if (TableStore.this.primaryKeys.length > 0 && TableStore.this.mergeEngine() == FileStoreOptions.MergeEngine.PARTIAL_UPDATE) {
                throw new ValidationException("Partial update continuous reading is not supported.");
            }
            LogOptions.LogStartupMode logStartupMode = (LogOptions.LogStartupMode) TableStore.this.logOptions().get(LogOptions.SCAN);
            if (this.logSourceProvider == null) {
                return buildFileSource(true, logStartupMode == LogOptions.LogStartupMode.LATEST);
            }
            return logStartupMode != LogOptions.LogStartupMode.FULL ? this.logSourceProvider.createSource((Map) null) : HybridSource.builder(buildFileSource(false, false)).addSource(new LogHybridSourceFactory(this.logSourceProvider), Boundedness.CONTINUOUS_UNBOUNDED).build();
        }

        public DataStreamSource<RowData> build() {
            if (this.env == null) {
                throw new IllegalArgumentException("StreamExecutionEnvironment should not be null.");
            }
            DataStreamSource<RowData> fromSource = this.env.fromSource(buildSource(), WatermarkStrategy.noWatermarks(), TableStore.this.tableIdentifier.asSummaryString(), InternalTypeInfo.of((LogicalType) Optional.ofNullable(this.projectedFields).map(Projection::of).map(projection -> {
                return projection.project(TableStore.this.type);
            }).orElse(TableStore.this.type)));
            if (this.parallelism != null) {
                fromSource.setParallelism(this.parallelism.intValue());
            }
            return fromSource;
        }
    }

    public TableStore(Configuration configuration) {
        this.options = configuration;
    }

    public TableStore withUser(String str) {
        this.user = str;
        return this;
    }

    public TableStore withSchema(RowType rowType) {
        this.type = rowType;
        return this;
    }

    public TableStore withPartitions(int[] iArr) {
        this.partitions = iArr;
        adjustIndexAndValidate();
        return this;
    }

    public TableStore withPrimaryKeys(int[] iArr) {
        this.primaryKeys = iArr;
        this.logPrimaryKeys = iArr;
        adjustIndexAndValidate();
        return this;
    }

    public TableStore withTableIdentifier(ObjectIdentifier objectIdentifier) {
        this.tableIdentifier = objectIdentifier;
        return this;
    }

    public boolean partitioned() {
        return this.partitions.length > 0;
    }

    public boolean valueCountMode() {
        return this.primaryKeys.length == 0;
    }

    public List<String> fieldNames() {
        return this.type.getFieldNames();
    }

    public List<String> partitionKeys() {
        return TypeUtils.project(this.type, this.partitions).getFieldNames();
    }

    @VisibleForTesting
    List<String> primaryKeys() {
        return TypeUtils.project(this.type, this.primaryKeys).getFieldNames();
    }

    public Configuration options() {
        return this.options;
    }

    public Configuration logOptions() {
        return new DelegatingConfiguration(this.options, "log.");
    }

    public SourceBuilder sourceBuilder() {
        return new SourceBuilder();
    }

    public SinkBuilder sinkBuilder() {
        return new SinkBuilder();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public FileStoreOptions.MergeEngine mergeEngine() {
        return (FileStoreOptions.MergeEngine) this.options.get(FileStoreOptions.MERGE_ENGINE);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public FileStore buildFileStore() {
        RowType rowType;
        RowType rowType2;
        ValueCountMergeFunction partialUpdateMergeFunction;
        RowType project = TypeUtils.project(this.type, this.partitions);
        if (this.primaryKeys.length == 0) {
            rowType = this.type;
            rowType2 = RowType.of(new LogicalType[]{new BigIntType(false)}, new String[]{"_VALUE_COUNT"});
            partialUpdateMergeFunction = new ValueCountMergeFunction();
        } else {
            rowType = new RowType((List) TypeUtils.project(this.type, this.primaryKeys).getFields().stream().map(rowField -> {
                return new RowType.RowField("_KEY_" + rowField.getName(), rowField.getType(), (String) rowField.getDescription().orElse(null));
            }).collect(Collectors.toList()));
            rowType2 = this.type;
            switch (AnonymousClass1.$SwitchMap$org$apache$flink$table$store$file$FileStoreOptions$MergeEngine[mergeEngine().ordinal()]) {
                case 1:
                    partialUpdateMergeFunction = new DeduplicateMergeFunction();
                    break;
                case 2:
                    List children = this.type.getChildren();
                    RowData.FieldGetter[] fieldGetterArr = new RowData.FieldGetter[children.size()];
                    for (int i = 0; i < children.size(); i++) {
                        fieldGetterArr[i] = RowData.createFieldGetter((LogicalType) children.get(i), i);
                    }
                    partialUpdateMergeFunction = new PartialUpdateMergeFunction(fieldGetterArr);
                    break;
                default:
                    throw new UnsupportedOperationException("Unsupported merge engine: " + mergeEngine());
            }
        }
        return new FileStoreImpl(this.tableIdentifier, this.options, this.user, project, rowType, rowType2, partialUpdateMergeFunction);
    }

    private void adjustIndexAndValidate() {
        if (this.logPrimaryKeys.length <= 0 || this.partitions.length <= 0) {
            return;
        }
        List list = (List) Arrays.stream(this.logPrimaryKeys).boxed().collect(Collectors.toList());
        List list2 = (List) Arrays.stream(this.partitions).boxed().collect(Collectors.toList());
        String obj = this.type == null ? list.toString() : TypeUtils.project(this.type, this.logPrimaryKeys).getFieldNames().toString();
        String obj2 = this.type == null ? list2.toString() : TypeUtils.project(this.type, this.partitions).getFieldNames().toString();
        Preconditions.checkState(list.containsAll(list2), String.format("Primary key constraint %s should include all partition fields %s", obj, obj2));
        this.primaryKeys = Arrays.stream(this.logPrimaryKeys).filter(i -> {
            return !list2.contains(Integer.valueOf(i));
        }).toArray();
        Preconditions.checkState(this.primaryKeys.length > 0, String.format("Primary key constraint %s should not be same with partition fields %s, this will result in only one record in a partition", obj, obj2));
    }
}
