package org.apache.seatunnel.connectors.doris.sink.writer;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.doris.config.DorisConfig;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
import org.apache.seatunnel.connectors.doris.rest.models.RespContent;
import org.apache.seatunnel.connectors.doris.sink.HttpPutBuilder;
import org.apache.seatunnel.connectors.doris.sink.LoadStatus;
import org.apache.seatunnel.connectors.doris.util.ResponseUtil;
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.class */
public class DorisStreamLoad implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(DorisStreamLoad.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final int HTTP_TEMPORARY_REDIRECT = 200;
    private final LabelGenerator labelGenerator;
    private final byte[] lineDelimiter;
    private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";
    private static final String ABORT_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc";
    private static final String JOB_EXIST_FINISHED = "FINISHED";
    private final String loadUrlStr;
    private final String hostPort;
    private final String abortUrlStr;
    private final String user;
    private final String passwd;
    private final String db;
    private final String table;
    private final boolean enable2PC;
    private final boolean enableDelete;
    private final Properties streamLoadProp;
    private final RecordStream recordStream;
    private Future<CloseableHttpResponse> pendingLoadFuture;
    private final CloseableHttpClient httpClient;
    private String label;
    private volatile boolean loading = false;
    private long recordCount = 0;
    private final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setNameFormat("stream-load-upload").build());
    private volatile boolean loadBatchFirstRecord = true;

    public DorisStreamLoad(String str, TablePath tablePath, DorisConfig dorisConfig, LabelGenerator labelGenerator, CloseableHttpClient closeableHttpClient) {
        this.hostPort = str;
        this.db = tablePath.getDatabaseName();
        this.table = tablePath.getTableName();
        this.user = dorisConfig.getUsername();
        this.passwd = dorisConfig.getPassword();
        this.labelGenerator = labelGenerator;
        this.loadUrlStr = String.format(LOAD_URL_PATTERN, str, this.db, this.table);
        this.abortUrlStr = String.format(ABORT_URL_PATTERN, str, this.db);
        this.enable2PC = dorisConfig.getEnable2PC().booleanValue();
        this.streamLoadProp = dorisConfig.getStreamLoadProps();
        this.enableDelete = dorisConfig.getEnableDelete().booleanValue();
        this.httpClient = closeableHttpClient;
        this.recordStream = new RecordStream(dorisConfig.getBufferSize().intValue(), dorisConfig.getBufferCount().intValue());
        this.lineDelimiter = this.streamLoadProp.getProperty(LoadConstants.LINE_DELIMITER_KEY, "\n").getBytes();
    }

    public void abortPreCommit(String str, long j) throws Exception {
        long j2 = j;
        log.info("abort for labelSuffix {}. start chkId {}.", str, Long.valueOf(j));
        while (true) {
            try {
                String generateLabel = this.labelGenerator.generateLabel(j2);
                HttpPutBuilder httpPutBuilder = new HttpPutBuilder();
                httpPutBuilder.setUrl(this.loadUrlStr).baseAuth(this.user, this.passwd).addCommonHeader().enable2PC().setLabel(generateLabel).setEmptyEntity().addProperties(this.streamLoadProp);
                RespContent handlePreCommitResponse = handlePreCommitResponse(this.httpClient.execute((HttpUriRequest) httpPutBuilder.build()));
                Preconditions.checkState("true".equals(handlePreCommitResponse.getTwoPhaseCommit()));
                if (!LoadStatus.LABEL_ALREADY_EXIST.equals(handlePreCommitResponse.getStatus())) {
                    log.info("abort {} for check label {}.", Long.valueOf(handlePreCommitResponse.getTxnId()), generateLabel);
                    abortTransaction(handlePreCommitResponse.getTxnId());
                    log.info("abort for labelSuffix {} finished", str);
                    return;
                } else {
                    if (JOB_EXIST_FINISHED.equals(handlePreCommitResponse.getExistingJobStatus())) {
                        throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, "Load status is Label Already Exists and load job finished, change you label prefix or restore from latest savepoint!");
                    }
                    Matcher matcher = ResponseUtil.LABEL_EXIST_PATTERN.matcher(handlePreCommitResponse.getMessage());
                    if (!matcher.find()) {
                        throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, "Load Status is Label Already Exists, but no txnID associated with it!response: " + handlePreCommitResponse);
                    }
                    Preconditions.checkState(generateLabel.equals(matcher.group(1)));
                    long parseLong = Long.parseLong(matcher.group(2));
                    log.info("abort {} for exist label {}", Long.valueOf(parseLong), generateLabel);
                    abortTransaction(parseLong);
                    j2++;
                }
            } catch (Exception e) {
                log.warn("failed to stream load data", e);
                throw e;
            }
        }
    }

    public void writeRecord(byte[] bArr) throws IOException {
        if (this.loadBatchFirstRecord) {
            this.loadBatchFirstRecord = false;
            this.recordStream.startInput();
            startStreamLoad();
        } else {
            this.recordStream.write(this.lineDelimiter);
        }
        this.recordStream.write(bArr);
        this.recordCount++;
    }

    public String getLoadFailedMsg() {
        String message;
        if (!this.loading || getPendingLoadFuture() == null || !getPendingLoadFuture().isDone()) {
            return null;
        }
        try {
            message = handlePreCommitResponse(this.pendingLoadFuture.get()).getMessage();
        } catch (Exception e) {
            message = ExceptionUtils.getMessage(e);
        }
        this.recordStream.setErrorMessageByStreamLoad(message);
        return message;
    }

    private RespContent handlePreCommitResponse(CloseableHttpResponse closeableHttpResponse) throws Exception {
        if (closeableHttpResponse.getStatusLine().getStatusCode() != 200 || closeableHttpResponse.getEntity() == null) {
            throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, closeableHttpResponse.getStatusLine().toString());
        }
        String entityUtils = EntityUtils.toString(closeableHttpResponse.getEntity());
        log.info("load Result {}", entityUtils);
        return (RespContent) OBJECT_MAPPER.readValue(entityUtils, RespContent.class);
    }

    public RespContent stopLoad() throws IOException {
        this.loading = false;
        if (this.pendingLoadFuture == null) {
            return null;
        }
        log.info("stream load stopped.");
        this.recordStream.endInput();
        try {
            try {
                RespContent handlePreCommitResponse = handlePreCommitResponse(this.pendingLoadFuture.get());
                this.pendingLoadFuture = null;
                return handlePreCommitResponse;
            } catch (Exception e) {
                throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, e);
            }
        } catch (Throwable th) {
            this.pendingLoadFuture = null;
            throw th;
        }
    }

    public void startLoad(String str) {
        this.loadBatchFirstRecord = true;
        this.recordCount = 0L;
        this.label = str;
        this.loading = true;
    }

    private void startStreamLoad() {
        HttpPutBuilder httpPutBuilder = new HttpPutBuilder();
        log.info("stream load started for {}", this.label);
        try {
            httpPutBuilder.setUrl(this.loadUrlStr).baseAuth(this.user, this.passwd).addCommonHeader().addHiddenColumns(this.enableDelete).setLabel(this.label).setEntity(new InputStreamEntity(this.recordStream)).addProperties(this.streamLoadProp);
            if (this.enable2PC) {
                httpPutBuilder.enable2PC();
            }
            this.pendingLoadFuture = this.executorService.submit(() -> {
                log.info("start execute load");
                return this.httpClient.execute((HttpUriRequest) httpPutBuilder.build());
            });
        } catch (Exception e) {
            log.warn("failed to stream load data with label: " + this.label, e);
            throw e;
        }
    }

    public void abortTransaction(long j) throws Exception {
        HttpPutBuilder httpPutBuilder = new HttpPutBuilder();
        httpPutBuilder.setUrl(this.abortUrlStr).baseAuth(this.user, this.passwd).addCommonHeader().addTxnId(j).setEmptyEntity().abort();
        CloseableHttpResponse execute = this.httpClient.execute((HttpUriRequest) httpPutBuilder.build());
        if (execute.getStatusLine().getStatusCode() != 200 || execute.getEntity() == null) {
            log.warn("abort transaction response: " + execute.getStatusLine().toString());
            throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, "Fail to abort transaction " + j + " with url " + this.abortUrlStr);
        }
        Map map = (Map) JsonUtils.parseObject(EntityUtils.toString(execute.getEntity()), new TypeReference<HashMap<String, String>>() { // from class: org.apache.seatunnel.connectors.doris.sink.writer.DorisStreamLoad.1
        });
        if (LoadStatus.SUCCESS.equals(map.get("status"))) {
            return;
        }
        if (ResponseUtil.isCommitted((String) map.get("msg"))) {
            throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, "try abort committed transaction, do you recover from old savepoint?");
        }
        log.warn("Fail to abort transaction. txnId: {}, error: {}", Long.valueOf(j), map.get("msg"));
    }

    public void close() throws IOException {
        if (null != this.httpClient) {
            try {
                this.httpClient.close();
            } catch (IOException e) {
                throw new IOException("Closing httpClient failed.", e);
            }
        }
        if (null != this.executorService) {
            this.executorService.shutdownNow();
        }
    }

    public String getHostPort() {
        return this.hostPort;
    }

    public String getDb() {
        return this.db;
    }

    public Future<CloseableHttpResponse> getPendingLoadFuture() {
        return this.pendingLoadFuture;
    }

    public long getRecordCount() {
        return this.recordCount;
    }
}
