package org.apache.flink.connectors.kudu.connector.writer;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
import org.apache.flink.connectors.kudu.connector.failure.DefaultKuduFailureHandler;
import org.apache.flink.connectors.kudu.connector.failure.KuduFailureHandler;
import org.apache.kudu.client.DeleteTableResponse;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.OperationResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connectors/kudu/connector/writer/KuduWriter.class */
public class KuduWriter<T> implements AutoCloseable {
    private final Logger log;
    private final KuduTableInfo tableInfo;
    private final KuduWriterConfig writerConfig;
    private final KuduFailureHandler failureHandler;
    private final KuduOperationMapper<T> operationMapper;
    private transient KuduClient client;
    private transient KuduSession session;
    private transient KuduTable table;

    public KuduWriter(KuduTableInfo kuduTableInfo, KuduWriterConfig kuduWriterConfig, KuduOperationMapper<T> kuduOperationMapper) throws IOException {
        this(kuduTableInfo, kuduWriterConfig, kuduOperationMapper, new DefaultKuduFailureHandler());
    }

    public KuduWriter(KuduTableInfo kuduTableInfo, KuduWriterConfig kuduWriterConfig, KuduOperationMapper<T> kuduOperationMapper, KuduFailureHandler kuduFailureHandler) throws IOException {
        this.log = LoggerFactory.getLogger(getClass());
        this.tableInfo = kuduTableInfo;
        this.writerConfig = kuduWriterConfig;
        this.failureHandler = kuduFailureHandler;
        this.client = obtainClient();
        this.session = obtainSession();
        this.table = obtainTable();
        this.operationMapper = kuduOperationMapper;
    }

    private KuduClient obtainClient() {
        return new KuduClient.KuduClientBuilder(this.writerConfig.getMasters()).build();
    }

    private KuduSession obtainSession() {
        KuduSession newSession = this.client.newSession();
        newSession.setFlushMode(this.writerConfig.getFlushMode());
        newSession.setTimeoutMillis(this.writerConfig.getOperationTimeout());
        newSession.setMutationBufferSpace(this.writerConfig.getMaxBufferSize());
        newSession.setFlushInterval(this.writerConfig.getFlushInterval());
        newSession.setIgnoreAllDuplicateRows(this.writerConfig.isIgnoreDuplicate());
        newSession.setIgnoreAllNotFoundRows(this.writerConfig.isIgnoreNotFound());
        return newSession;
    }

    private KuduTable obtainTable() throws IOException {
        String name = this.tableInfo.getName();
        if (this.client.tableExists(name)) {
            return this.client.openTable(name);
        }
        if (this.tableInfo.getCreateTableIfNotExists()) {
            return this.client.createTable(name, this.tableInfo.getSchema(), this.tableInfo.getCreateTableOptions());
        }
        throw new RuntimeException("Table " + name + " does not exist.");
    }

    public void write(T t) throws IOException {
        checkAsyncErrors();
        Iterator<Operation> it = this.operationMapper.createOperations(t, this.table).iterator();
        while (it.hasNext()) {
            checkErrors(this.session.apply(it.next()));
        }
    }

    public void flushAndCheckErrors() throws IOException {
        checkAsyncErrors();
        flush();
        checkAsyncErrors();
    }

    @VisibleForTesting
    public DeleteTableResponse deleteTable() throws IOException {
        return this.client.deleteTable(this.table.getName());
    }

    @Override // java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            flushAndCheckErrors();
            try {
                if (this.session != null) {
                    this.session.close();
                }
            } catch (Exception e) {
                this.log.error("Error while closing session.", e);
            }
            try {
                if (this.client != null) {
                    this.client.close();
                }
            } catch (Exception e2) {
                this.log.error("Error while closing client.", e2);
            }
        } catch (Throwable th) {
            try {
                if (this.session != null) {
                    this.session.close();
                }
            } catch (Exception e3) {
                this.log.error("Error while closing session.", e3);
            }
            try {
                if (this.client != null) {
                    this.client.close();
                }
            } catch (Exception e4) {
                this.log.error("Error while closing client.", e4);
            }
            throw th;
        }
    }

    private void flush() throws IOException {
        this.session.flush();
    }

    private void checkErrors(OperationResponse operationResponse) throws IOException {
        if (operationResponse == null || !operationResponse.hasRowError()) {
            checkAsyncErrors();
        } else {
            this.failureHandler.onFailure(Arrays.asList(operationResponse.getRowError()));
        }
    }

    private void checkAsyncErrors() throws IOException {
        if (this.session.countPendingErrors() == 0) {
            return;
        }
        this.failureHandler.onFailure(Arrays.asList(this.session.getPendingErrors().getRowErrors()));
    }
}
