package org.apache.flink.connectors.kudu.connector.reader;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
import org.apache.flink.connectors.kudu.connector.convertor.RowResultConvertor;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduScanToken;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.LocatedTablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connectors/kudu/connector/reader/KuduReader.class */
public class KuduReader<T> implements AutoCloseable {
    private final Logger log;
    private final KuduTableInfo tableInfo;
    private final KuduReaderConfig readerConfig;
    private List<KuduFilterInfo> tableFilters;
    private List<String> tableProjections;
    private final RowResultConvertor<T> rowResultConvertor;
    private final transient KuduClient client;
    private final transient KuduSession session;
    private final transient KuduTable table;

    public KuduReader(KuduTableInfo kuduTableInfo, KuduReaderConfig kuduReaderConfig, RowResultConvertor<T> rowResultConvertor) throws IOException {
        this(kuduTableInfo, kuduReaderConfig, rowResultConvertor, new ArrayList(), null);
    }

    public KuduReader(KuduTableInfo kuduTableInfo, KuduReaderConfig kuduReaderConfig, RowResultConvertor<T> rowResultConvertor, List<KuduFilterInfo> list) throws IOException {
        this(kuduTableInfo, kuduReaderConfig, rowResultConvertor, list, null);
    }

    public KuduReader(KuduTableInfo kuduTableInfo, KuduReaderConfig kuduReaderConfig, RowResultConvertor<T> rowResultConvertor, List<KuduFilterInfo> list, List<String> list2) throws IOException {
        this.log = LoggerFactory.getLogger(getClass());
        this.tableInfo = kuduTableInfo;
        this.readerConfig = kuduReaderConfig;
        this.tableFilters = list;
        this.tableProjections = list2;
        this.rowResultConvertor = rowResultConvertor;
        this.client = obtainClient();
        this.session = obtainSession();
        this.table = obtainTable();
    }

    public void setTableFilters(List<KuduFilterInfo> list) {
        this.tableFilters = list;
    }

    public void setTableProjections(List<String> list) {
        this.tableProjections = list;
    }

    private KuduClient obtainClient() {
        return new KuduClient.KuduClientBuilder(this.readerConfig.getMasters()).build();
    }

    private KuduSession obtainSession() {
        return this.client.newSession();
    }

    private KuduTable obtainTable() throws IOException {
        String name = this.tableInfo.getName();
        if (this.client.tableExists(name)) {
            return this.client.openTable(name);
        }
        if (this.tableInfo.getCreateTableIfNotExists()) {
            return this.client.createTable(name, this.tableInfo.getSchema(), this.tableInfo.getCreateTableOptions());
        }
        throw new RuntimeException("Table " + name + " does not exist.");
    }

    public KuduReaderIterator<T> scanner(byte[] bArr) throws IOException {
        return new KuduReaderIterator<>(KuduScanToken.deserializeIntoScanner(bArr, this.client), this.rowResultConvertor);
    }

    public List<KuduScanToken> scanTokens(List<KuduFilterInfo> list, List<String> list2, Integer num) {
        KuduScanToken.KuduScanTokenBuilder newScanTokenBuilder = this.client.newScanTokenBuilder(this.table);
        if (list2 != null) {
            newScanTokenBuilder.setProjectedColumnNames(list2);
        }
        if (CollectionUtils.isNotEmpty(list)) {
            Stream<R> map = list.stream().map(kuduFilterInfo -> {
                return kuduFilterInfo.toPredicate(this.table.getSchema());
            });
            Objects.requireNonNull(newScanTokenBuilder);
            map.forEach(newScanTokenBuilder::addPredicate);
        }
        if (num != null && num.intValue() > 0) {
            newScanTokenBuilder.limit(num.intValue());
        }
        return newScanTokenBuilder.build();
    }

    public KuduInputSplit[] createInputSplits(int i) throws IOException {
        List<KuduScanToken> scanTokens = scanTokens(this.tableFilters, this.tableProjections, Integer.valueOf(this.readerConfig.getRowLimit()));
        KuduInputSplit[] kuduInputSplitArr = new KuduInputSplit[scanTokens.size()];
        for (int i2 = 0; i2 < scanTokens.size(); i2++) {
            KuduScanToken kuduScanToken = scanTokens.get(i2);
            ArrayList arrayList = new ArrayList(kuduScanToken.getTablet().getReplicas().size());
            for (LocatedTablet.Replica replica : kuduScanToken.getTablet().getReplicas()) {
                arrayList.add(getLocation(replica.getRpcHost(), replica.getRpcPort()));
            }
            kuduInputSplitArr[i2] = new KuduInputSplit(kuduScanToken.serialize(), i2, (String[]) arrayList.toArray(new String[arrayList.size()]));
        }
        if (kuduInputSplitArr.length < i) {
            this.log.warn(" The minimum desired number of splits with your configured parallelism level is {}. Current kudu splits = {}. {} instances will remain idle.", new Object[]{Integer.valueOf(i), Integer.valueOf(kuduInputSplitArr.length), Integer.valueOf(i - kuduInputSplitArr.length)});
        }
        return kuduInputSplitArr;
    }

    private String getLocation(String str, Integer num) {
        return str + ":" + num;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            if (this.session != null) {
                this.session.close();
            }
        } catch (KuduException e) {
            this.log.error("Error while closing session.", e);
        }
        try {
            if (this.client != null) {
                this.client.close();
            }
        } catch (Exception e2) {
            this.log.error("Error while closing client.", e2);
        }
    }
}
