package org.apache.seatunnel.connectors.seatunnel.iceberg.sink;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher;
import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergTableLoader;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SinkConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.commit.IcebergCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.commit.IcebergFilesCommitter;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.state.IcebergSinkState;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.writer.IcebergWriterFactory;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.writer.RecordWriter;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.writer.WriteResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.class */
public class IcebergSinkWriter implements SinkWriter<SeaTunnelRow, IcebergCommitInfo, IcebergSinkState>, SupportMultiTableSinkWriter<Void> {
    private static final Logger log = LoggerFactory.getLogger(IcebergSinkWriter.class);
    private SeaTunnelRowType rowType;
    private SinkConfig config;
    private IcebergTableLoader icebergTableLoader;
    private RecordWriter writer;
    private IcebergFilesCommitter filesCommitter;
    private String commitUser;
    private long checkpointId;
    private List<WriteResult> results = Lists.newArrayList();
    private final DataTypeChangeEventHandler dataTypeChangeEventHandler = new DataTypeChangeEventDispatcher();

    public IcebergSinkWriter(IcebergTableLoader icebergTableLoader, SinkConfig sinkConfig, SeaTunnelRowType seaTunnelRowType, List<IcebergSinkState> list) {
        this.commitUser = UUID.randomUUID().toString();
        this.config = sinkConfig;
        this.icebergTableLoader = icebergTableLoader;
        this.rowType = seaTunnelRowType;
        this.filesCommitter = IcebergFilesCommitter.of(sinkConfig, icebergTableLoader);
        tryCreateRecordWriter();
        if (!Objects.nonNull(list) || list.isEmpty()) {
            return;
        }
        this.commitUser = list.get(0).getCommitUser();
        this.checkpointId = list.get(0).getCheckpointId();
        preCommit(list);
    }

    private void preCommit(List<IcebergSinkState> list) {
        list.forEach(icebergSinkState -> {
            this.filesCommitter.doCommit(icebergSinkState.getWriteResults());
        });
    }

    private void tryCreateRecordWriter() {
        if (this.writer == null) {
            this.writer = new IcebergWriterFactory(this.icebergTableLoader, this.config).createWriter(this.rowType);
        }
    }

    public static IcebergSinkWriter of(SinkConfig sinkConfig, CatalogTable catalogTable) {
        return of(sinkConfig, catalogTable, null);
    }

    public static IcebergSinkWriter of(SinkConfig sinkConfig, CatalogTable catalogTable, List<IcebergSinkState> list) {
        return new IcebergSinkWriter(IcebergTableLoader.create(sinkConfig, catalogTable).open(), sinkConfig, catalogTable.getSeaTunnelRowType(), list);
    }

    public void write(SeaTunnelRow seaTunnelRow) throws IOException {
        tryCreateRecordWriter();
        this.writer.write(seaTunnelRow, this.rowType);
    }

    public Optional<IcebergCommitInfo> prepareCommit() throws IOException {
        List<WriteResult> complete = this.writer.complete();
        IcebergCommitInfo icebergCommitInfo = new IcebergCommitInfo(complete);
        this.results.addAll(complete);
        return Optional.of(icebergCommitInfo);
    }

    public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) throws IOException {
        if (this.config.isTableSchemaEvolutionEnabled()) {
            log.info("changed rowType before: {}", fieldsInfo(this.rowType));
            this.rowType = this.dataTypeChangeEventHandler.reset(this.rowType).apply(schemaChangeEvent);
            log.info("changed rowType after: {}", fieldsInfo(this.rowType));
            this.writer.applySchemaChange(this.rowType, schemaChangeEvent);
        }
    }

    public List<IcebergSinkState> snapshotState(long j) throws IOException {
        IcebergSinkState icebergSinkState = new IcebergSinkState(this.results, this.commitUser, j);
        this.results.clear();
        return Collections.singletonList(icebergSinkState);
    }

    public void abortPrepare() {
    }

    public void close() throws IOException {
    }

    private String fieldsInfo(SeaTunnelRowType seaTunnelRowType) {
        String[] strArr = new String[seaTunnelRowType.getTotalFields()];
        for (int i = 0; i < seaTunnelRowType.getTotalFields(); i++) {
            strArr[i] = String.format("%s<%s>", seaTunnelRowType.getFieldName(i), seaTunnelRowType.getFieldType(i));
        }
        return StringUtils.join(strArr, ", ");
    }
}
