package org.apache.seatunnel.connectors.seatunnel.iceberg.sink.writer;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.TaskWriter;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.event.AlterTableAddColumnEvent;
import org.apache.seatunnel.api.table.event.AlterTableChangeColumnEvent;
import org.apache.seatunnel.api.table.event.AlterTableDropColumnEvent;
import org.apache.seatunnel.api.table.event.AlterTableModifyColumnEvent;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SinkConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.data.RowConverter;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.schema.SchemaChangeWrapper;
import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtils;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.class */
public class IcebergRecordWriter implements RecordWriter {
    private static final Logger log = LoggerFactory.getLogger(IcebergRecordWriter.class);
    private final Table table;
    private final SinkConfig config;
    private final List<WriteResult> writerResults = Lists.newArrayList();
    private TaskWriter<Record> writer = createTaskWriter();
    private RowConverter recordConverter;
    private IcebergWriterFactory writerFactory;

    public IcebergRecordWriter(Table table, IcebergWriterFactory icebergWriterFactory, SinkConfig sinkConfig) {
        this.config = sinkConfig;
        this.table = table;
        this.recordConverter = new RowConverter(table, sinkConfig);
        this.writerFactory = icebergWriterFactory;
    }

    private TaskWriter<Record> createTaskWriter() {
        return this.writerFactory.createTaskWriter(this.table, this.config);
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.iceberg.sink.writer.RecordWriter
    public void write(SeaTunnelRow seaTunnelRow, SeaTunnelRowType seaTunnelRowType) {
        SchemaChangeWrapper schemaChangeWrapper = new SchemaChangeWrapper();
        Record convert = this.recordConverter.convert(seaTunnelRow, seaTunnelRowType, schemaChangeWrapper);
        if (!schemaChangeWrapper.empty()) {
            applySchemaUpdate(schemaChangeWrapper);
            convert = this.recordConverter.convert(seaTunnelRow, seaTunnelRowType);
        }
        try {
            this.writer.write(new IcebergRecord(convert, seaTunnelRow.getRowKind()));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.iceberg.sink.writer.RecordWriter
    public void applySchemaChange(SeaTunnelRowType seaTunnelRowType, SchemaChangeEvent schemaChangeEvent) {
        log.info("Apply schema change start.");
        SchemaChangeWrapper schemaChangeWrapper = new SchemaChangeWrapper();
        Schema schema = this.table.schema();
        if (schemaChangeEvent instanceof AlterTableDropColumnEvent) {
            schemaChangeWrapper.deleteColumn(((AlterTableDropColumnEvent) schemaChangeEvent).getColumn());
        } else if (!(schemaChangeEvent instanceof AlterTableAddColumnEvent) && !(schemaChangeEvent instanceof AlterTableModifyColumnEvent) && (schemaChangeEvent instanceof AlterTableChangeColumnEvent)) {
            AlterTableChangeColumnEvent alterTableChangeColumnEvent = (AlterTableChangeColumnEvent) schemaChangeEvent;
            changeColumn(schema, alterTableChangeColumnEvent.getColumn(), alterTableChangeColumnEvent.getOldColumn(), schemaChangeWrapper);
        }
        if (!schemaChangeWrapper.empty()) {
            applySchemaUpdate(schemaChangeWrapper);
        }
        log.info("Apply schema change end.");
    }

    private void changeColumn(Schema schema, Column column, String str, SchemaChangeWrapper schemaChangeWrapper) {
        if (schema.findField(str) != null) {
            schemaChangeWrapper.changeColumn(str, column.getName());
        }
    }

    private void applySchemaUpdate(SchemaChangeWrapper schemaChangeWrapper) {
        flush();
        SchemaUtils.applySchemaUpdates(this.table, schemaChangeWrapper);
        resetWriter();
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.iceberg.sink.writer.RecordWriter
    public List<WriteResult> complete() {
        flush();
        ArrayList newArrayList = Lists.newArrayList(this.writerResults);
        this.writerResults.clear();
        resetWriter();
        return newArrayList;
    }

    private void resetWriter() {
        this.writer = createTaskWriter();
        this.recordConverter = new RowConverter(this.table, this.config);
    }

    private void flush() {
        if (this.writer == null) {
            return;
        }
        try {
            org.apache.iceberg.io.WriteResult complete = this.writer.complete();
            this.writerResults.add(new WriteResult(Arrays.asList(complete.dataFiles()), Arrays.asList(complete.deleteFiles()), this.table.spec().partitionType()));
            this.writer = null;
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.iceberg.sink.writer.RecordWriter
    public void close() {
    }
}
