package org.apache.iceberg.spark.extensions;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;
import org.apache.spark.SparkEnv;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runners.Parameterized;

/* loaded from: input_file:org/apache/iceberg/spark/extensions/TestSparkExecutorCache.class */
public class TestSparkExecutorCache extends SparkExtensionsTestBase {
    private static final String UPDATES_VIEW_NAME = "updates";
    private static final AtomicInteger JOB_COUNTER = new AtomicInteger();
    private static final Map<String, CustomInputFile> INPUT_FILES = Collections.synchronizedMap(Maps.newHashMap());
    private String targetTableName;
    private TableIdentifier targetTableIdent;

    /* loaded from: input_file:org/apache/iceberg/spark/extensions/TestSparkExecutorCache$CustomFileIO.class */
    public static class CustomFileIO implements FileIO {
        public InputFile newInputFile(String str) {
            return (InputFile) TestSparkExecutorCache.INPUT_FILES.computeIfAbsent(str, str2 -> {
                return new CustomInputFile(str);
            });
        }

        public OutputFile newOutputFile(String str) {
            return Files.localOutput(str);
        }

        public void deleteFile(String str) {
            if (!new File(str).delete()) {
                throw new RuntimeIOException("Failed to delete file: " + str, new Object[0]);
            }
        }
    }

    /* loaded from: input_file:org/apache/iceberg/spark/extensions/TestSparkExecutorCache$CustomInputFile.class */
    public static class CustomInputFile implements InputFile {
        private final InputFile delegate;
        private final AtomicInteger streamCount = new AtomicInteger();

        public CustomInputFile(String str) {
            this.delegate = Files.localInput(str);
        }

        public long getLength() {
            return this.delegate.getLength();
        }

        public SeekableInputStream newStream() {
            this.streamCount.incrementAndGet();
            return this.delegate.newStream();
        }

        public int streamCount() {
            return this.streamCount.get();
        }

        public String location() {
            return this.delegate.location();
        }

        public boolean exists() {
            return this.delegate.exists();
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
    public static Object[][] parameters() {
        return new Object[]{new Object[]{"testhive", SparkCatalog.class.getName(), ImmutableMap.of("type", "hive", "io-impl", CustomFileIO.class.getName(), "default-namespace", "default")}};
    }

    public TestSparkExecutorCache(String str, String str2, Map<String, String> map) {
        super(str, str2, map);
    }

    @Before
    public void configureTargetTableName() {
        String str = "target_exec_cache_" + JOB_COUNTER.incrementAndGet();
        this.targetTableName = tableName(str);
        this.targetTableIdent = TableIdentifier.of(Namespace.of(new String[]{"default"}), str);
    }

    @After
    public void releaseResources() {
        sql("DROP TABLE IF EXISTS %s", new Object[]{this.targetTableName});
        sql("DROP TABLE IF EXISTS %s", new Object[]{UPDATES_VIEW_NAME});
        INPUT_FILES.clear();
    }

    @Test
    public void testCopyOnWriteDelete() throws Exception {
        checkDelete(RowLevelOperationMode.COPY_ON_WRITE);
    }

    @Test
    public void testMergeOnReadDelete() throws Exception {
        checkDelete(RowLevelOperationMode.MERGE_ON_READ);
    }

    private void checkDelete(RowLevelOperationMode rowLevelOperationMode) throws Exception {
        List<DeleteFile> createAndInitTable = createAndInitTable("write.delete.mode", rowLevelOperationMode);
        sql("DELETE FROM %s WHERE id = 1 OR id = 4", new Object[]{this.targetTableName});
        int i = rowLevelOperationMode == RowLevelOperationMode.COPY_ON_WRITE ? 3 : 1;
        Assertions.assertThat(createAndInitTable).allMatch(deleteFile -> {
            return streamCount(deleteFile) <= i;
        });
        assertEquals("Should have expected rows", ImmutableList.of(), sql("SELECT * FROM %s ORDER BY id ASC", new Object[]{this.targetTableName}));
    }

    @Test
    public void testCopyOnWriteUpdate() throws Exception {
        checkUpdate(RowLevelOperationMode.COPY_ON_WRITE);
    }

    @Test
    public void testMergeOnReadUpdate() throws Exception {
        checkUpdate(RowLevelOperationMode.MERGE_ON_READ);
    }

    private void checkUpdate(RowLevelOperationMode rowLevelOperationMode) throws Exception {
        List<DeleteFile> createAndInitTable = createAndInitTable("write.update.mode", rowLevelOperationMode);
        spark.createDataset(ImmutableList.of(1, 4), Encoders.INT()).createOrReplaceTempView(UPDATES_VIEW_NAME);
        sql("UPDATE %s SET id = -1 WHERE id IN (SELECT * FROM %s)", new Object[]{this.targetTableName, UPDATES_VIEW_NAME});
        int i = rowLevelOperationMode == RowLevelOperationMode.COPY_ON_WRITE ? 5 : 1;
        Assertions.assertThat(createAndInitTable).allMatch(deleteFile -> {
            return streamCount(deleteFile) <= i;
        });
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "hr"}), row(new Object[]{-1, "hr"})), sql("SELECT * FROM %s ORDER BY id ASC", new Object[]{this.targetTableName}));
    }

    @Test
    public void testCopyOnWriteMerge() throws Exception {
        checkMerge(RowLevelOperationMode.COPY_ON_WRITE);
    }

    @Test
    public void testMergeOnReadMerge() throws Exception {
        checkMerge(RowLevelOperationMode.MERGE_ON_READ);
    }

    private void checkMerge(RowLevelOperationMode rowLevelOperationMode) throws Exception {
        List<DeleteFile> createAndInitTable = createAndInitTable("write.merge.mode", rowLevelOperationMode);
        spark.createDataset(ImmutableList.of(1, 4), Encoders.INT()).createOrReplaceTempView(UPDATES_VIEW_NAME);
        sql("MERGE INTO %s t USING %s s ON t.id == s.value WHEN MATCHED THEN   UPDATE SET id = 100 WHEN NOT MATCHED THEN   INSERT (id, dep) VALUES (-1, 'unknown')", new Object[]{this.targetTableName, UPDATES_VIEW_NAME});
        int i = rowLevelOperationMode == RowLevelOperationMode.COPY_ON_WRITE ? 3 : 1;
        Assertions.assertThat(createAndInitTable).allMatch(deleteFile -> {
            return streamCount(deleteFile) <= i;
        });
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{100, "hr"}), row(new Object[]{100, "hr"})), sql("SELECT * FROM %s ORDER BY id ASC", new Object[]{this.targetTableName}));
    }

    private int streamCount(DeleteFile deleteFile) {
        return INPUT_FILES.get(deleteFile.path().toString()).streamCount();
    }

    private List<DeleteFile> createAndInitTable(String str, RowLevelOperationMode rowLevelOperationMode) throws Exception {
        sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg TBLPROPERTIES ('%s' '%s', '%s' '%s', '%s' '%s')", new Object[]{this.targetTableName, "write.metadata.path", this.temp.toString().replaceFirst("file:", ""), "write.data.path", this.temp.toString().replaceFirst("file:", ""), str, rowLevelOperationMode.modeName()});
        append(this.targetTableName, new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr"));
        append(this.targetTableName, new Employee(3, "hr"), new Employee(4, "hr"), new Employee(5, "hr"));
        Table loadTable = this.validationCatalog.loadTable(this.targetTableIdent);
        Pair<DeleteFile, CharSequenceSet> writePosDeletes = writePosDeletes(loadTable, (List) dataFiles(loadTable).stream().map(dataFile -> {
            return Pair.of(dataFile.path(), 0L);
        }).collect(Collectors.toList()));
        DeleteFile deleteFile = (DeleteFile) writePosDeletes.first();
        CharSequenceSet charSequenceSet = (CharSequenceSet) writePosDeletes.second();
        DeleteFile writeEqDeletes = writeEqDeletes(loadTable, "id", 2, 5);
        loadTable.newRowDelta().validateFromSnapshot(loadTable.currentSnapshot().snapshotId()).validateDataFilesExist(charSequenceSet).addDeletes(deleteFile).addDeletes(writeEqDeletes).commit();
        sql("REFRESH TABLE %s", new Object[]{this.targetTableName});
        SparkEnv.get().blockManager().memoryStore().clear();
        return ImmutableList.of(deleteFile, writeEqDeletes);
    }

    private DeleteFile writeEqDeletes(Table table, String str, Object... objArr) throws IOException {
        Schema select = table.schema().select(new String[]{str});
        GenericRecord create = GenericRecord.create(select);
        ArrayList newArrayList = Lists.newArrayList();
        for (Object obj : objArr) {
            newArrayList.add(create.copy(str, obj));
        }
        return FileHelpers.writeDeleteFile(table, Files.localOutput(this.temp.newFile("eq-deletes-" + UUID.randomUUID())), (StructLike) null, newArrayList, select);
    }

    private Pair<DeleteFile, CharSequenceSet> writePosDeletes(Table table, List<Pair<CharSequence, Long>> list) throws IOException {
        return FileHelpers.writeDeleteFile(table, Files.localOutput(this.temp.newFile("pos-deletes-" + UUID.randomUUID())), (StructLike) null, list);
    }

    private void append(String str, Employee... employeeArr) throws NoSuchTableException {
        spark.createDataFrame(Arrays.asList(employeeArr), Employee.class).coalesce(1).writeTo(str).append();
    }

    private Collection<DataFile> dataFiles(Table table) {
        try {
            CloseableIterable planFiles = table.newScan().planFiles();
            Throwable th = null;
            try {
                try {
                    ImmutableList copyOf = ImmutableList.copyOf(Iterables.transform(planFiles, (v0) -> {
                        return v0.file();
                    }));
                    if (planFiles != null) {
                        if (0 != 0) {
                            try {
                                planFiles.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            planFiles.close();
                        }
                    }
                    return copyOf;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeIOException(e);
        }
    }
}
