package com.starrocks.connector.flink.table.sink.v2;

import com.starrocks.connector.flink.manager.StarRocksStreamLoadListener;
import com.starrocks.connector.flink.table.data.StarRocksRowData;
import com.starrocks.connector.flink.table.sink.ExactlyOnceLabelGeneratorFactory;
import com.starrocks.connector.flink.table.sink.LingeringTransactionAborter;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import com.starrocks.connector.flink.table.sink.StarRocksSinkSemantic;
import com.starrocks.connector.flink.tools.EnvUtils;
import com.starrocks.data.load.stream.LabelGeneratorFactory;
import com.starrocks.data.load.stream.StreamLoadSnapshot;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
import com.starrocks.data.load.stream.v2.StreamLoadManagerV2;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/starrocks/connector/flink/table/sink/v2/StarRocksWriter.class */
public class StarRocksWriter<InputT> implements StatefulSink.StatefulSinkWriter<InputT, StarRocksWriterState>, TwoPhaseCommittingSink.PrecommittingSinkWriter<InputT, StarRocksCommittable> {
    private static final Logger LOG = LoggerFactory.getLogger(StarRocksWriter.class);
    private final StarRocksSinkOptions sinkOptions;
    private final RecordSerializationSchema<InputT> serializationSchema;
    private final StarRocksStreamLoadListener streamLoadListener;
    private final LabelGeneratorFactory labelGeneratorFactory;
    private final StreamLoadManagerV2 sinkManager;
    private long totalReceivedRows = 0;

    public StarRocksWriter(StarRocksSinkOptions starRocksSinkOptions, Sink.InitContext initContext, SerializationSchema.InitializationContext initializationContext, RecordSerializationSchema<InputT> recordSerializationSchema, StreamLoadProperties streamLoadProperties, Collection<StarRocksWriterState> collection) throws Exception {
        this.sinkOptions = starRocksSinkOptions;
        this.serializationSchema = recordSerializationSchema;
        this.serializationSchema.open(initializationContext, new DefaultStarRocksSinkContext(initContext, starRocksSinkOptions));
        this.streamLoadListener = new StarRocksStreamLoadListener(initContext.metricGroup(), starRocksSinkOptions);
        long orElse = initContext.getRestoredCheckpointId().orElse(0L);
        ArrayList arrayList = new ArrayList();
        Iterator<StarRocksWriterState> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.addAll(it.next().getLabelSnapshots());
        }
        String labelPrefix = starRocksSinkOptions.getLabelPrefix();
        if (labelPrefix == null || starRocksSinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE || !starRocksSinkOptions.isEnableExactlyOnceLabelGen()) {
            this.labelGeneratorFactory = new LabelGeneratorFactory.DefaultLabelGeneratorFactory(labelPrefix == null ? "flink" : labelPrefix);
        } else {
            this.labelGeneratorFactory = new ExactlyOnceLabelGeneratorFactory(labelPrefix, initContext.getNumberOfParallelSubtasks(), initContext.getSubtaskId(), orElse);
        }
        this.sinkManager = new StreamLoadManagerV2(streamLoadProperties, starRocksSinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE);
        this.sinkManager.setStreamLoadListener(this.streamLoadListener);
        this.sinkManager.setLabelGeneratorFactory(this.labelGeneratorFactory);
        try {
            this.sinkManager.init();
            try {
                if (starRocksSinkOptions.getSemantic() == StarRocksSinkSemantic.EXACTLY_ONCE && starRocksSinkOptions.isAbortLingeringTxns()) {
                    new LingeringTransactionAborter(starRocksSinkOptions.getLabelPrefix(), orElse, initContext.getSubtaskId(), starRocksSinkOptions.getAbortCheckNumTxns(), starRocksSinkOptions.getDbTables(), arrayList, this.sinkManager.getStreamLoader()).execute();
                }
                LOG.info("Create StarRocksWriter. {}", EnvUtils.getGitInformation());
            } catch (Exception e) {
                LOG.error("Failed to abort lingering transactions.", e);
                try {
                    this.sinkManager.close();
                } catch (Exception e2) {
                    LOG.error("Failed to close sink manager after aborting lingering transaction failure.", e2);
                }
                throw new RuntimeException("Failed to abort lingering transactions", e);
            }
        } catch (Exception e3) {
            LOG.error("Failed to init sink manager.", e3);
            try {
                this.sinkManager.close();
            } catch (Exception e4) {
                LOG.error("Failed to close sink manager after init failure.", e4);
            }
            throw new RuntimeException("Failed to init sink manager", e3);
        }
    }

    public void write(InputT inputt, SinkWriter.Context context) throws IOException, InterruptedException {
        StarRocksRowData serialize = this.serializationSchema.serialize(inputt);
        if (serialize == null) {
            return;
        }
        this.sinkManager.write(serialize.getUniqueKey(), serialize.getDatabase(), serialize.getTable(), serialize.getRow());
        this.totalReceivedRows++;
        if (this.totalReceivedRows % 100 == 1) {
            LOG.debug("Received raw record: {}", inputt);
            LOG.debug("Received serialized record: {}", serialize.getRow());
        }
    }

    public void flush(boolean z) throws IOException, InterruptedException {
        this.sinkManager.flush();
    }

    public Collection<StarRocksCommittable> prepareCommit() throws IOException, InterruptedException {
        if (this.sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {
            return Collections.emptyList();
        }
        StreamLoadSnapshot snapshot = this.sinkManager.snapshot();
        if (this.sinkManager.prepare(snapshot)) {
            return Collections.singleton(new StarRocksCommittable(snapshot));
        }
        this.sinkManager.abort(snapshot);
        throw new RuntimeException("Snapshot state failed by prepare");
    }

    public List<StarRocksWriterState> snapshotState(long j) throws IOException {
        return (this.sinkOptions.getSemantic() == StarRocksSinkSemantic.EXACTLY_ONCE && (this.labelGeneratorFactory instanceof ExactlyOnceLabelGeneratorFactory)) ? Collections.singletonList(new StarRocksWriterState(((ExactlyOnceLabelGeneratorFactory) this.labelGeneratorFactory).snapshot(j))) : Collections.emptyList();
    }

    public void close() throws Exception {
        LOG.info("Close StarRocksWriter");
        this.serializationSchema.close();
        if (this.sinkManager != null) {
            try {
                this.sinkManager.abort(this.sinkManager.snapshot());
            } finally {
                this.sinkManager.close();
            }
        }
    }
}
