package org.apache.seatunnel.connectors.doris.sink.writer;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.doris.config.DorisConfig;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
import org.apache.seatunnel.connectors.doris.rest.models.RespContent;
import org.apache.seatunnel.connectors.doris.serialize.DorisSerializer;
import org.apache.seatunnel.connectors.doris.serialize.SeaTunnelRowSerializer;
import org.apache.seatunnel.connectors.doris.sink.LoadStatus;
import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo;
import org.apache.seatunnel.connectors.doris.util.HttpUtil;
import org.apache.seatunnel.connectors.doris.util.UnsupportedTypeConverterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.class */
public class DorisSinkWriter implements SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState>, SupportMultiTableSinkWriter<Void> {
    private static final int INITIAL_DELAY = 200;
    private long lastCheckpointId;
    private DorisStreamLoad dorisStreamLoad;
    volatile boolean loading;
    private final DorisConfig dorisConfig;
    private final String labelPrefix;
    private final LabelGenerator labelGenerator;
    private final int intervalTime;
    private final DorisSerializer serializer;
    private final CatalogTable catalogTable;
    private final ScheduledExecutorService scheduledExecutorService;
    private Thread executorThread;
    private volatile Exception loadException = null;
    private static final Logger log = LoggerFactory.getLogger(DorisSinkWriter.class);
    private static final List<String> DORIS_SUCCESS_STATUS = new ArrayList(Arrays.asList(LoadStatus.SUCCESS, LoadStatus.PUBLISH_TIMEOUT));

    public DorisSinkWriter(SinkWriter.Context context, List<DorisSinkState> list, CatalogTable catalogTable, DorisConfig dorisConfig, String str) {
        this.dorisConfig = dorisConfig;
        this.catalogTable = catalogTable;
        this.lastCheckpointId = !list.isEmpty() ? list.get(0).getCheckpointId() : 0L;
        log.info("restore checkpointId {}", Long.valueOf(this.lastCheckpointId));
        log.info("labelPrefix " + dorisConfig.getLabelPrefix());
        this.labelPrefix = dorisConfig.getLabelPrefix() + "_" + catalogTable.getTablePath().getFullName().replaceAll("\\.", "_") + "_" + str + "_" + context.getIndexOfSubtask();
        this.labelGenerator = new LabelGenerator(this.labelPrefix, dorisConfig.getEnable2PC().booleanValue());
        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat("stream-load-check").build());
        this.serializer = createSerializer(dorisConfig, catalogTable.getSeaTunnelRowType());
        this.intervalTime = dorisConfig.getCheckInterval().intValue();
        this.loading = false;
        initializeLoad();
    }

    private void initializeLoad() {
        try {
            this.dorisStreamLoad = new DorisStreamLoad(this.dorisConfig.getFrontends(), this.catalogTable.getTablePath(), this.dorisConfig, this.labelGenerator, new HttpUtil().getHttpClient());
            if (this.dorisConfig.getEnable2PC().booleanValue()) {
                this.dorisStreamLoad.abortPreCommit(this.labelPrefix, this.lastCheckpointId + 1);
            }
            startLoad(this.labelGenerator.generateLabel(this.lastCheckpointId + 1));
            this.scheduledExecutorService.scheduleWithFixedDelay(this::checkDone, 200L, this.intervalTime, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, e);
        }
    }

    public void write(SeaTunnelRow seaTunnelRow) throws IOException {
        checkLoadExceptionAndResetThread();
        byte[] serialize = this.serializer.serialize(this.dorisConfig.isNeedsUnsupportedTypeCasting() ? UnsupportedTypeConverterUtils.convertRow(seaTunnelRow) : seaTunnelRow);
        if (Objects.isNull(serialize)) {
            return;
        }
        this.dorisStreamLoad.writeRecord(serialize);
        if (this.dorisConfig.getEnable2PC().booleanValue() || this.dorisStreamLoad.getRecordCount() < this.dorisConfig.getBatchSize()) {
            return;
        }
        flush();
        startLoad(this.labelGenerator.generateLabel(this.lastCheckpointId));
    }

    public Optional<DorisCommitInfo> prepareCommit() throws IOException {
        RespContent flush = flush();
        if (!this.dorisConfig.getEnable2PC().booleanValue() || flush == null) {
            return Optional.empty();
        }
        return Optional.of(new DorisCommitInfo(this.dorisStreamLoad.getHostPort(), this.dorisStreamLoad.getDb(), flush.getTxnId()));
    }

    private RespContent flush() throws IOException {
        this.loading = false;
        Preconditions.checkState(this.dorisStreamLoad != null);
        RespContent stopLoad = this.dorisStreamLoad.stopLoad();
        if (stopLoad == null || DORIS_SUCCESS_STATUS.contains(stopLoad.getStatus())) {
            return stopLoad;
        }
        throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, String.format("stream load error: %s, see more in %s", stopLoad.getMessage(), stopLoad.getErrorURL()));
    }

    public List<DorisSinkState> snapshotState(long j) {
        Preconditions.checkState(this.dorisStreamLoad != null);
        startLoad(this.labelGenerator.generateLabel(j + 1));
        this.lastCheckpointId = j;
        return Collections.singletonList(new DorisSinkState(this.labelPrefix, this.lastCheckpointId));
    }

    private void startLoad(String str) {
        this.dorisStreamLoad.startLoad(str);
        this.loading = true;
    }

    public void abortPrepare() {
        if (this.dorisConfig.getEnable2PC().booleanValue()) {
            try {
                this.dorisStreamLoad.abortPreCommit(this.labelPrefix, this.lastCheckpointId + 1);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private void checkDone() {
        String message;
        log.debug("start timer checker, interval {} ms", Integer.valueOf(this.intervalTime));
        if (this.dorisStreamLoad.getPendingLoadFuture() == null || !this.dorisStreamLoad.getPendingLoadFuture().isDone()) {
            return;
        }
        if (!this.loading) {
            log.debug("not loading, skip timer checker");
            return;
        }
        try {
            message = this.dorisStreamLoad.handlePreCommitResponse(this.dorisStreamLoad.getPendingLoadFuture().get()).getMessage();
        } catch (Exception e) {
            message = e.getMessage();
        }
        this.loadException = new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, message);
        log.error("stream load finished unexpectedly, interrupt worker thread! {}", message);
        this.executorThread.interrupt();
    }

    private void checkLoadExceptionAndResetThread() {
        if (this.loadException != null) {
            throw new RuntimeException("error while loading data.", this.loadException);
        }
        this.executorThread = Thread.currentThread();
    }

    public void close() throws IOException {
        if (!this.dorisConfig.getEnable2PC().booleanValue()) {
            flush();
        }
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdownNow();
        }
        if (this.dorisStreamLoad != null) {
            this.dorisStreamLoad.close();
        }
    }

    private DorisSerializer createSerializer(DorisConfig dorisConfig, SeaTunnelRowType seaTunnelRowType) {
        return new SeaTunnelRowSerializer(dorisConfig.getStreamLoadProps().getProperty(LoadConstants.FORMAT_KEY).toLowerCase(), seaTunnelRowType, dorisConfig.getStreamLoadProps().getProperty(LoadConstants.FIELD_DELIMITER_KEY), dorisConfig.getEnableDelete().booleanValue());
    }
}
