package org.apache.flink.connectors.kudu.table;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
import org.apache.flink.connectors.kudu.connector.convertor.RowResultRowConvertor;
import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
import org.apache.flink.connectors.kudu.format.KuduRowInputFormat;
import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.sources.FilterableTableSource;
import org.apache.flink.table.sources.LimitableTableSource;
import org.apache.flink.table.sources.ProjectableTableSource;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connectors/kudu/table/KuduTableSource.class */
public class KuduTableSource implements StreamTableSource<Row>, LimitableTableSource<Row>, ProjectableTableSource<Row>, FilterableTableSource<Row> {
    private static final Logger LOG = LoggerFactory.getLogger(KuduTableSource.class);
    private final KuduReaderConfig.Builder configBuilder;
    private final KuduTableInfo tableInfo;
    private final TableSchema flinkSchema;
    private final String[] projectedFields;

    @Nullable
    private final List<KuduFilterInfo> predicates;
    private boolean isFilterPushedDown;
    private KuduRowInputFormat kuduRowInputFormat;

    public KuduTableSource(KuduReaderConfig.Builder builder, KuduTableInfo kuduTableInfo, TableSchema tableSchema, List<KuduFilterInfo> list, String[] strArr) {
        this.configBuilder = builder;
        this.tableInfo = kuduTableInfo;
        this.flinkSchema = tableSchema;
        this.predicates = list;
        this.projectedFields = strArr;
        if (list != null && list.size() != 0) {
            this.isFilterPushedDown = true;
        }
        this.kuduRowInputFormat = new KuduRowInputFormat(builder.build(), new RowResultRowConvertor(), kuduTableInfo, list == null ? Collections.emptyList() : list, strArr == null ? null : Lists.newArrayList(strArr));
    }

    public boolean isBounded() {
        return true;
    }

    public DataStream<Row> getDataStream(StreamExecutionEnvironment streamExecutionEnvironment) {
        return streamExecutionEnvironment.createInput(new KuduRowInputFormat(this.configBuilder.build(), new RowResultRowConvertor(), this.tableInfo, this.predicates == null ? Collections.emptyList() : this.predicates, this.projectedFields == null ? null : Lists.newArrayList(this.projectedFields)), TypeConversions.fromDataTypeToLegacyInfo(getProducedDataType())).name(explainSource());
    }

    public TableSchema getTableSchema() {
        return this.flinkSchema;
    }

    public boolean isFilterPushedDown() {
        return this.isFilterPushedDown;
    }

    public DataType getProducedDataType() {
        if (this.projectedFields == null) {
            return this.flinkSchema.toRowDataType();
        }
        DataTypes.Field[] fieldArr = new DataTypes.Field[this.projectedFields.length];
        for (int i = 0; i < fieldArr.length; i++) {
            String str = this.projectedFields[i];
            fieldArr[i] = DataTypes.FIELD(str, ((TableColumn) this.flinkSchema.getTableColumn(str).get()).getType());
        }
        return DataTypes.ROW(fieldArr);
    }

    public boolean isLimitPushedDown() {
        return true;
    }

    public TableSource<Row> applyLimit(long j) {
        return new KuduTableSource(this.configBuilder.setRowLimit((int) j), this.tableInfo, this.flinkSchema, this.predicates, this.projectedFields);
    }

    public TableSource<Row> projectFields(int[] iArr) {
        String[] strArr = new String[iArr.length];
        List fieldNames = getProducedDataType().getLogicalType().getFieldNames();
        for (int i = 0; i < iArr.length; i++) {
            strArr[i] = (String) fieldNames.get(iArr[i]);
        }
        return new KuduTableSource(this.configBuilder, this.tableInfo, this.flinkSchema, this.predicates, strArr);
    }

    public TableSource<Row> applyPredicate(List<Expression> list) {
        ArrayList arrayList = new ArrayList();
        ListIterator<Expression> listIterator = list.listIterator();
        while (listIterator.hasNext()) {
            Expression next = listIterator.next();
            Optional<KuduFilterInfo> kuduFilterInfo = KuduTableUtils.toKuduFilterInfo(next);
            if (kuduFilterInfo == null || !kuduFilterInfo.isPresent()) {
                LOG.debug("Predicate [{}] could not be pushed into KuduFilterInfo for KuduTable [{}].", next, this.tableInfo.getName());
            } else {
                LOG.debug("Predicate [{}] converted into KuduFilterInfo and pushed into KuduTable [{}].", next, this.tableInfo.getName());
                arrayList.add(kuduFilterInfo.get());
                listIterator.remove();
            }
        }
        return new KuduTableSource(this.configBuilder, this.tableInfo, this.flinkSchema, arrayList, this.projectedFields);
    }

    public String explainSource() {
        return "KuduTableSource[schema=" + Arrays.toString(getTableSchema().getFieldNames()) + ", filter=" + predicateString() + (this.projectedFields != null ? ", projectFields=" + Arrays.toString(this.projectedFields) + "]" : "]");
    }

    private String predicateString() {
        return (this.predicates == null || this.predicates.size() == 0) ? "No predicates push down" : "AND(" + this.predicates + ")";
    }
}
