package org.apache.beam.sdk.io.kudu;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.kudu.KuduIO;
import org.apache.beam.sdk.io.kudu.KuduService;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.kudu.Common;
import org.apache.kudu.Schema;
import org.apache.kudu.client.AbstractKuduScannerBuilder;
import org.apache.kudu.client.AsyncKuduClient;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduPredicate;
import org.apache.kudu.client.KuduScanToken;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.RowError;
import org.apache.kudu.client.RowResult;
import org.apache.kudu.client.RowResultIterator;
import org.apache.kudu.client.SessionConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/kudu/KuduServiceImpl.class */
class KuduServiceImpl<T> implements KuduService<T> {
    private static final Logger LOG = LoggerFactory.getLogger(KuduServiceImpl.class);

    /* loaded from: input_file:org/apache/beam/sdk/io/kudu/KuduServiceImpl$ReaderImpl.class */
    class ReaderImpl extends BoundedSource.BoundedReader<T> {
        private final KuduIO.KuduSource<T> source;
        private KuduClient client;
        private KuduScanner scanner;
        private RowResultIterator iter;
        private RowResult current;
        private long recordsReturned;

        ReaderImpl(KuduIO.KuduSource<T> kuduSource) {
            this.source = kuduSource;
        }

        public boolean start() throws IOException {
            KuduServiceImpl.LOG.debug("Starting Kudu reader");
            this.client = new AsyncKuduClient.AsyncKuduClientBuilder(this.source.spec.getMasterAddresses()).build().syncClient();
            if (this.source.serializedToken != null) {
                this.scanner = KuduScanToken.deserializeIntoScanner(this.source.serializedToken, this.client);
            } else {
                KuduTable openTable = this.client.openTable(this.source.spec.getTable());
                KuduScanner.KuduScannerBuilder newScannerBuilder = openTable.getAsyncClient().syncClient().newScannerBuilder(openTable);
                KuduServiceImpl.configureBuilder(this.source.spec, openTable.getSchema(), newScannerBuilder);
                this.scanner = newScannerBuilder.build();
            }
            return advance();
        }

        public T getCurrent() throws NoSuchElementException {
            if (this.current != null) {
                return (T) this.source.spec.getParseFn().apply(this.current);
            }
            throw new NoSuchElementException("No current record (Indicates misuse. Perhaps advance() was not called?)");
        }

        public boolean advance() throws KuduException {
            if (this.iter == null || (!this.iter.hasNext() && this.scanner.hasMoreRows())) {
                this.iter = this.scanner.nextRows();
            }
            if (this.iter == null || !this.iter.hasNext()) {
                return false;
            }
            this.current = this.iter.next();
            this.recordsReturned++;
            return true;
        }

        public void close() throws IOException {
            KuduServiceImpl.LOG.debug("Closing reader after reading {} records.", Long.valueOf(this.recordsReturned));
            if (this.scanner != null) {
                this.scanner.close();
                this.scanner = null;
            }
            if (this.client != null) {
                this.client.close();
                this.client = null;
            }
        }

        /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public synchronized KuduIO.KuduSource m3getCurrentSource() {
            return this.source;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kudu/KuduServiceImpl$WriterImpl.class */
    class WriterImpl implements KuduService.Writer<T> {
        private final KuduIO.FormatFunction<T> formatFunction;
        private KuduClient client;
        private KuduSession session;
        private KuduTable table;

        WriterImpl(KuduIO.Write<T> write) throws KuduException {
            Preconditions.checkNotNull(write.masterAddresses(), "masterAddresses cannot be null");
            Preconditions.checkNotNull(write.table(), "table cannot be null");
            this.formatFunction = (KuduIO.FormatFunction) Preconditions.checkNotNull(write.formatFn(), "formatFn cannot be null");
            this.client = new AsyncKuduClient.AsyncKuduClientBuilder(write.masterAddresses()).build().syncClient();
            this.table = this.client.openTable(write.table());
        }

        @Override // org.apache.beam.sdk.io.kudu.KuduService.Writer
        public void openSession() throws KuduException {
            this.session = this.client.newSession();
            this.session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
        }

        @Override // org.apache.beam.sdk.io.kudu.KuduService.Writer
        public void write(T t) throws KuduException {
            Preconditions.checkState(this.session != null, "must call openSession() before writing");
            this.session.apply((Operation) this.formatFunction.apply(new TableAndRecord(this.table, t)));
        }

        @Override // org.apache.beam.sdk.io.kudu.KuduService.Writer
        public void closeSession() throws Exception {
            try {
                this.session.close();
                if (this.session.countPendingErrors() > 0) {
                    KuduServiceImpl.LOG.error("At least {} errors occurred writing to Kudu", Integer.valueOf(this.session.countPendingErrors()));
                    RowError[] rowErrors = this.session.getPendingErrors().getRowErrors();
                    for (int i = 0; rowErrors != null && i < 3 && i < rowErrors.length; i++) {
                        KuduServiceImpl.LOG.error("Sample error: {}", rowErrors[i]);
                    }
                    throw new Exception("At least " + this.session.countPendingErrors() + " error(s) occurred writing to Kudu");
                }
            } finally {
                this.session = null;
            }
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            this.client.close();
            this.client = null;
        }
    }

    @Override // org.apache.beam.sdk.io.kudu.KuduService
    public KuduService.Writer createWriter(KuduIO.Write<T> write) throws KuduException {
        return new WriterImpl(write);
    }

    @Override // org.apache.beam.sdk.io.kudu.KuduService
    public BoundedSource.BoundedReader createReader(KuduIO.KuduSource kuduSource) {
        return new ReaderImpl(kuduSource);
    }

    @Override // org.apache.beam.sdk.io.kudu.KuduService
    public List<byte[]> createTabletScanners(KuduIO.Read read) throws KuduException {
        KuduClient kuduClient = getKuduClient(read.getMasterAddresses());
        Throwable th = null;
        try {
            try {
                KuduTable openTable = kuduClient.openTable(read.getTable());
                KuduScanToken.KuduScanTokenBuilder newScanTokenBuilder = kuduClient.newScanTokenBuilder(openTable);
                configureBuilder(read, openTable.getSchema(), newScanTokenBuilder);
                List<byte[]> list = (List) newScanTokenBuilder.build().stream().map(kuduScanToken -> {
                    Objects.requireNonNull(kuduScanToken);
                    return (byte[]) uncheckCall(kuduScanToken::serialize);
                }).collect(Collectors.toList());
                if (kuduClient != null) {
                    if (0 != 0) {
                        try {
                            kuduClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kuduClient.close();
                    }
                }
                return list;
            } finally {
            }
        } catch (Throwable th3) {
            if (kuduClient != null) {
                if (th != null) {
                    try {
                        kuduClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kuduClient.close();
                }
            }
            throw th3;
        }
    }

    private synchronized KuduClient getKuduClient(List<String> list) {
        return new AsyncKuduClient.AsyncKuduClientBuilder(list).build().syncClient();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T2> void configureBuilder(KuduIO.Read<T2> read, Schema schema, AbstractKuduScannerBuilder abstractKuduScannerBuilder) {
        abstractKuduScannerBuilder.cacheBlocks(true);
        if (read.getBatchSize() != null) {
            abstractKuduScannerBuilder.batchSizeBytes(read.getBatchSize().intValue());
        }
        if (read.getProjectedColumns() != null) {
            abstractKuduScannerBuilder.setProjectedColumnNames(read.getProjectedColumns());
        }
        if (read.getFaultTolerent() != null) {
            abstractKuduScannerBuilder.setFaultTolerant(read.getFaultTolerent().booleanValue());
        }
        if (read.getSerializablePredicates() != null) {
            Iterator<Common.ColumnPredicatePB> it = read.getSerializablePredicates().iterator();
            while (it.hasNext()) {
                abstractKuduScannerBuilder.addPredicate(KuduPredicate.fromPB(schema, it.next()));
            }
        }
    }

    private static <T> T uncheckCall(Callable<T> callable) {
        try {
            return callable.call();
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }
}
