package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.ElasticsearchRowSerializer;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.SeaTunnelRowSerializer;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchSinkState;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.class */
public class ElasticsearchSinkWriter implements SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, ElasticsearchSinkState> {
    private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkWriter.class);
    private final SinkWriter.Context context;
    private final int maxBatchSize;
    private final SeaTunnelRowSerializer seaTunnelRowSerializer;
    private final List<String> requestEsList;
    private EsRestClient esRestClient;
    private RetryUtils.RetryMaterial retryMaterial;
    private static final long DEFAULT_SLEEP_TIME_MS = 200;

    public ElasticsearchSinkWriter(SinkWriter.Context context, SeaTunnelRowType seaTunnelRowType, Config config, int i, int i2, List<ElasticsearchSinkState> list) {
        this.context = context;
        this.maxBatchSize = i;
        IndexInfo indexInfo = new IndexInfo(config);
        this.esRestClient = EsRestClient.createInstance(config);
        this.seaTunnelRowSerializer = new ElasticsearchRowSerializer(ElasticsearchVersion.get(this.esRestClient.getClusterVersion()), indexInfo, seaTunnelRowType);
        this.requestEsList = new ArrayList(i);
        this.retryMaterial = new RetryUtils.RetryMaterial(i2, true, exc -> {
            return true;
        }, DEFAULT_SLEEP_TIME_MS);
    }

    public void write(SeaTunnelRow seaTunnelRow) {
        if (RowKind.UPDATE_BEFORE.equals(seaTunnelRow.getRowKind())) {
            return;
        }
        this.requestEsList.add(this.seaTunnelRowSerializer.serializeRow(seaTunnelRow));
        if (this.requestEsList.size() >= this.maxBatchSize) {
            bulkEsWithRetry(this.esRestClient, this.requestEsList);
        }
    }

    public Optional<ElasticsearchCommitInfo> prepareCommit() {
        bulkEsWithRetry(this.esRestClient, this.requestEsList);
        return Optional.empty();
    }

    public void abortPrepare() {
    }

    public synchronized void bulkEsWithRetry(EsRestClient esRestClient, List<String> list) {
        try {
            RetryUtils.retryWithException(() -> {
                if (list.size() <= 0) {
                    return null;
                }
                BulkResponse bulk = esRestClient.bulk(String.join("\n", list) + "\n");
                if (bulk.isErrors()) {
                    throw new ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.BULK_RESPONSE_ERROR, "bulk es error: " + bulk.getResponse());
                }
                return bulk;
            }, this.retryMaterial);
            list.clear();
        } catch (Exception e) {
            throw new ElasticsearchConnectorException(CommonErrorCode.SQL_OPERATION_FAILED, "ElasticSearch execute batch statement error", e);
        }
    }

    public void close() throws IOException {
        bulkEsWithRetry(this.esRestClient, this.requestEsList);
        this.esRestClient.close();
    }
}
