package org.apache.seatunnel.connectors.selectdb.sink.writer;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig;
import org.apache.seatunnel.connectors.selectdb.serialize.SeaTunnelRowSerializer;
import org.apache.seatunnel.connectors.selectdb.serialize.SelectDBSerializer;
import org.apache.seatunnel.connectors.selectdb.sink.committer.SelectDBCommitInfo;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.class */
public class SelectDBSinkWriter implements SinkWriter<SeaTunnelRow, SelectDBCommitInfo, SelectDBSinkState> {
    private static final Logger log = LoggerFactory.getLogger(SelectDBSinkWriter.class);
    private final SelectDBConfig selectdbConfig;
    private final long lastCheckpointId;
    private SelectDBStageLoad selectDBStageLoad;
    volatile boolean loading;
    private final String labelPrefix;
    private final byte[] lineDelimiter;
    private final LabelGenerator labelGenerator;
    private final SelectDBSinkState selectdbSinkState;
    private final SelectDBSerializer serializer;

    public SelectDBSinkWriter(SinkWriter.Context context, List<SelectDBSinkState> list, SeaTunnelRowType seaTunnelRowType, Config config, String str) {
        this.selectdbConfig = SelectDBConfig.loadConfig(config);
        this.lastCheckpointId = list.size() != 0 ? list.get(0).getCheckpointId() : 0L;
        log.info("restore checkpointId {}", Long.valueOf(this.lastCheckpointId));
        log.info("labelPrefix " + this.selectdbConfig.getLabelPrefix());
        this.selectdbSinkState = new SelectDBSinkState(this.selectdbConfig.getLabelPrefix(), this.lastCheckpointId);
        this.labelPrefix = this.selectdbConfig.getLabelPrefix() + "_" + str + "_" + context.getIndexOfSubtask();
        this.lineDelimiter = this.selectdbConfig.getStageLoadProps().getProperty(LoadConstants.LINE_DELIMITER_KEY, LoadConstants.LINE_DELIMITER_DEFAULT).getBytes();
        this.labelGenerator = new LabelGenerator(this.labelPrefix);
        this.serializer = createSerializer(this.selectdbConfig, seaTunnelRowType);
        this.loading = false;
    }

    public void initializeLoad(List<SelectDBSinkState> list) throws IOException {
        this.selectDBStageLoad = new SelectDBStageLoad(this.selectdbConfig, this.labelGenerator);
        this.selectDBStageLoad.setCurrentCheckpointID(this.lastCheckpointId + 1);
        this.serializer.open();
    }

    public synchronized void write(SeaTunnelRow seaTunnelRow) throws IOException {
        byte[] serialize = this.serializer.serialize(seaTunnelRow);
        if (Objects.isNull(serialize)) {
            return;
        }
        try {
            this.selectDBStageLoad.writeRecord(serialize);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public synchronized Optional<SelectDBCommitInfo> prepareCommit() throws IOException {
        Preconditions.checkState(this.selectDBStageLoad != null);
        log.info("checkpoint arrived, upload buffer to storage");
        try {
            this.selectDBStageLoad.flush(true);
            return Optional.of(new SelectDBCommitInfo(this.selectDBStageLoad.getHostPort(), this.selectdbConfig.getClusterName(), new CopySQLBuilder(this.selectdbConfig, this.selectDBStageLoad.getFileList()).buildCopySQL()));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public synchronized List<SelectDBSinkState> snapshotState(long j) throws IOException {
        Preconditions.checkState(this.selectDBStageLoad != null);
        log.info("clear the file list {}", this.selectDBStageLoad.getFileList());
        this.selectDBStageLoad.clearFileList();
        this.selectDBStageLoad.setCurrentCheckpointID(j + 1);
        return Collections.singletonList(this.selectdbSinkState);
    }

    public void abortPrepare() {
    }

    public void close() throws IOException {
        if (this.selectDBStageLoad != null) {
            this.selectDBStageLoad.close();
        }
        this.serializer.close();
    }

    public static SelectDBSerializer createSerializer(SelectDBConfig selectDBConfig, SeaTunnelRowType seaTunnelRowType) {
        return new SeaTunnelRowSerializer(selectDBConfig.getStageLoadProps().getProperty(LoadConstants.FORMAT_KEY).toLowerCase(), seaTunnelRowType, selectDBConfig.getStageLoadProps().getProperty(LoadConstants.FIELD_DELIMITER_KEY), selectDBConfig.getEnableDelete().booleanValue());
    }
}
