package org.apache.iceberg.spark.source;

import java.util.Iterator;
import java.util.Map;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.OverwriteFiles;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expressions;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/spark/source/StreamingWriter.class */
public class StreamingWriter extends Writer implements StreamWriter {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingWriter.class);
    private static final String QUERY_ID_PROPERTY = "spark.sql.streaming.queryId";
    private static final String EPOCH_ID_PROPERTY = "spark.sql.streaming.epochId";
    private final String queryId;
    private final OutputMode mode;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingWriter(SparkSession sparkSession, Table table, DataSourceOptions dataSourceOptions, String str, OutputMode outputMode, String str2, Schema schema, StructType structType) {
        super(sparkSession, table, dataSourceOptions, false, str2, schema, structType);
        this.queryId = str;
        this.mode = outputMode;
    }

    public void commit(long j, WriterCommitMessage[] writerCommitMessageArr) {
        LOG.info("Committing epoch {} for query {} in {} mode", new Object[]{Long.valueOf(j), this.queryId, this.mode});
        table().refresh();
        Long lastCommittedEpochId = getLastCommittedEpochId();
        if (lastCommittedEpochId != null && j <= lastCommittedEpochId.longValue()) {
            LOG.info("Skipping epoch {} for query {} as it was already committed", Long.valueOf(j), this.queryId);
            return;
        }
        if (this.mode != OutputMode.Complete()) {
            AppendFiles newFastAppend = table().newFastAppend();
            int i = 0;
            Iterator<DataFile> it = files(writerCommitMessageArr).iterator();
            while (it.hasNext()) {
                newFastAppend.appendFile(it.next());
                i++;
            }
            commit(newFastAppend, j, i, "streaming append");
            return;
        }
        OverwriteFiles newOverwrite = table().newOverwrite();
        newOverwrite.overwriteByRowFilter(Expressions.alwaysTrue());
        int i2 = 0;
        Iterator<DataFile> it2 = files(writerCommitMessageArr).iterator();
        while (it2.hasNext()) {
            newOverwrite.addFile(it2.next());
            i2++;
        }
        commit(newOverwrite, j, i2, "streaming complete overwrite");
    }

    private <T> void commit(SnapshotUpdate<T> snapshotUpdate, long j, int i, String str) {
        snapshotUpdate.set(QUERY_ID_PROPERTY, this.queryId);
        snapshotUpdate.set(EPOCH_ID_PROPERTY, Long.toString(j));
        commitOperation(snapshotUpdate, i, str);
    }

    public void abort(long j, WriterCommitMessage[] writerCommitMessageArr) {
        abort(writerCommitMessageArr);
    }

    private Long getLastCommittedEpochId() {
        Snapshot currentSnapshot = table().currentSnapshot();
        Long l = null;
        while (true) {
            if (currentSnapshot == null) {
                break;
            }
            Map summary = currentSnapshot.summary();
            if (this.queryId.equals((String) summary.get(QUERY_ID_PROPERTY))) {
                l = Long.valueOf((String) summary.get(EPOCH_ID_PROPERTY));
                break;
            }
            Long parentId = currentSnapshot.parentId();
            currentSnapshot = parentId != null ? table().snapshot(parentId.longValue()) : null;
        }
        return l;
    }

    @Override // org.apache.iceberg.spark.source.Writer
    public /* bridge */ /* synthetic */ String toString() {
        return super.toString();
    }

    @Override // org.apache.iceberg.spark.source.Writer
    public /* bridge */ /* synthetic */ void abort(WriterCommitMessage[] writerCommitMessageArr) {
        super.abort(writerCommitMessageArr);
    }

    @Override // org.apache.iceberg.spark.source.Writer
    public /* bridge */ /* synthetic */ void commit(WriterCommitMessage[] writerCommitMessageArr) {
        super.commit(writerCommitMessageArr);
    }

    @Override // org.apache.iceberg.spark.source.Writer
    public /* bridge */ /* synthetic */ DataWriterFactory createWriterFactory() {
        return super.createWriterFactory();
    }
}
