package org.apache.iceberg.spark.actions;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StaticTableOperations;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.actions.ActionsProvider;
import org.apache.iceberg.actions.ExpireSnapshots;
import org.apache.iceberg.actions.RewriteTablePath;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.TestBase;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.SparkEnv;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.storage.BlockInfoManager;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.BroadcastBlockId;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import scala.Tuple2;

/* loaded from: input_file:org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.class */
public class TestRewriteTablePathsAction extends TestBase {

    @TempDir
    private Path staging;

    @TempDir
    private Path tableDir;

    @TempDir
    private Path newTableDir;

    @TempDir
    private Path targetTableDir;
    private static final HadoopTables TABLES = new HadoopTables(new Configuration());
    protected static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional(1, "c1", Types.IntegerType.get()), Types.NestedField.optional(2, "c2", Types.StringType.get()), Types.NestedField.optional(3, "c3", Types.StringType.get())});
    protected String tableLocation = null;
    private Table table = null;
    private final String ns = "testns";
    private final String backupNs = "backupns";

    protected ActionsProvider actions() {
        return SparkActions.get();
    }

    @BeforeEach
    public void setupTableLocation() {
        this.tableLocation = this.tableDir.toFile().toURI().toString();
        this.table = createATableWith2Snapshots(this.tableLocation);
        createNameSpaces();
    }

    @AfterEach
    public void cleanupTableSetup() {
        dropNameSpaces();
    }

    private Table createATableWith2Snapshots(String str) {
        return createTableWithSnapshots(str, 2);
    }

    private Table createTableWithSnapshots(String str, int i) {
        return createTableWithSnapshots(str, i, Maps.newHashMap());
    }

    protected Table createTableWithSnapshots(String str, int i, Map<String, String> map) {
        return createTableWithSnapshots(str, i, map, "append");
    }

    private Table createTableWithSnapshots(String str, int i, Map<String, String> map, String str2) {
        Table create = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), map, str);
        Dataset coalesce = spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1);
        for (int i2 = 0; i2 < i; i2++) {
            coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode(str2).save(str);
        }
        return create;
    }

    private void createNameSpaces() {
        sql("CREATE DATABASE IF NOT EXISTS %s", "testns");
        sql("CREATE DATABASE IF NOT EXISTS %s", "backupns");
    }

    private void dropNameSpaces() {
        sql("DROP DATABASE IF EXISTS %s CASCADE", "testns");
        sql("DROP DATABASE IF EXISTS %s CASCADE", "backupns");
    }

    @Test
    public void testRewritePath() throws Exception {
        String targetTableLocation = targetTableLocation();
        Assertions.assertThat(spark.read().format("iceberg").load(this.tableLocation + "#files").select("file_path", new String[0]).as(Encoders.STRING()).collectAsList().size()).isEqualTo(2);
        RewriteTablePath.Result result = (RewriteTablePath.Result) actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.tableLocation, targetTableLocation).endVersion("v3.metadata.json").execute();
        Assertions.assertThat(result.latestVersion()).isEqualTo("v3.metadata.json");
        checkFileNum(3, 2, 2, 9, result);
        copyTableFiles(result);
        Assertions.assertThat(spark.read().format("iceberg").load(targetTableLocation + "#files").select("file_path", new String[0]).as(Encoders.STRING()).collectAsList()).hasSize(2).allMatch(str -> {
            return str.startsWith(targetTableLocation);
        });
        assertEquals("Rows should match after copy", rows(this.tableLocation), rows(targetTableLocation));
    }

    @Test
    public void testSameLocations() {
        Assertions.assertThatThrownBy(() -> {
            actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.tableLocation, this.tableLocation).endVersion("v1.metadata.json").execute();
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("Source prefix cannot be the same as target prefix");
    }

    @Test
    public void testStartVersion() throws Exception {
        RewriteTablePath.Result result = (RewriteTablePath.Result) actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.tableLocation, targetTableLocation()).startVersion("v2.metadata.json").execute();
        checkFileNum(1, 1, 1, 4, result);
        List<Tuple2<String, String>> readPathPairList = readPathPairList(result.fileListLocation());
        String valueOf = String.valueOf(this.table.currentSnapshot().snapshotId());
        Assertions.assertThat(readPathPairList.stream().filter(tuple2 -> {
            return ((String) tuple2._2()).contains(valueOf);
        }).count()).withFailMessage("Should have the current snapshot file", new Object[0]).isEqualTo(1L);
        String valueOf2 = String.valueOf(this.table.currentSnapshot().parentId());
        Assertions.assertThat(readPathPairList.stream().filter(tuple22 -> {
            return ((String) tuple22._2()).contains(valueOf2);
        }).count()).withFailMessage("Should NOT have the parent snapshot file", new Object[0]).isEqualTo(0L);
    }

    @Test
    public void testIncrementalRewrite() throws Exception {
        String newTableLocation = newTableLocation();
        Table create = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), newTableLocation);
        spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1).select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(newTableLocation);
        Assertions.assertThat(spark.read().format("iceberg").load(newTableLocation).count()).isEqualTo(1L);
        copyTableFiles((RewriteTablePath.Result) actions().rewriteTablePath(create).rewriteLocationPrefix(create.location(), targetTableLocation()).execute());
        Assertions.assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(1L);
        spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(2, "BBBBBBBBB", "BBB")}), ThreeColumnRecord.class).coalesce(1).select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(newTableLocation);
        Assertions.assertThat(spark.read().format("iceberg").load(newTableLocation).count()).isEqualTo(2L);
        create.refresh();
        copyTableFiles((RewriteTablePath.Result) actions().rewriteTablePath(create).rewriteLocationPrefix(create.location(), targetTableLocation()).startVersion(fileName(currentMetadata(TABLES.load(targetTableLocation())).metadataFileLocation())).execute());
        assertEquals("Rows should match after copy", rowsSorted(newTableLocation, "c1"), rowsSorted(targetTableLocation(), "c1"));
    }

    @Test
    public void testTableWith3Snapshots(@TempDir Path path, @TempDir Path path2) throws Exception {
        String newTableLocation = newTableLocation();
        Table createTableWithSnapshots = createTableWithSnapshots(newTableLocation, 3);
        checkFileNum(2, 2, 2, 8, (RewriteTablePath.Result) actions().rewriteTablePath(createTableWithSnapshots).rewriteLocationPrefix(newTableLocation, toAbsolute(path)).startVersion("v2.metadata.json").execute());
        checkFileNum(3, 3, 3, 12, (RewriteTablePath.Result) actions().rewriteTablePath(createTableWithSnapshots).rewriteLocationPrefix(newTableLocation, toAbsolute(path2)).startVersion("v1.metadata.json").execute());
    }

    @Test
    public void testFullTableRewritePath() throws Exception {
        checkFileNum(3, 2, 2, 9, (RewriteTablePath.Result) actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.tableLocation, targetTableLocation()).execute());
    }

    @Test
    public void testDeleteDataFile() throws Exception {
        this.table.newDelete().deleteFile((CharSequence) spark.read().format("iceberg").load(this.table.location() + "#files").select("file_path", new String[0]).as(Encoders.STRING()).collectAsList().stream().findFirst().get()).commit();
        RewriteTablePath.Result result = (RewriteTablePath.Result) actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.table.location(), targetTableLocation()).stagingLocation(stagingLocation()).execute();
        checkFileNum(4, 3, 3, 12, result);
        copyTableFiles(result);
        Assertions.assertThat(spark.read().format("iceberg").load(targetTableLocation()).as(Encoders.bean(ThreeColumnRecord.class)).count()).withFailMessage("There are only one row left since we deleted a data file", new Object[0]).isEqualTo(1L);
    }

    @Test
    public void testPositionDeletes() throws Exception {
        this.table.newRowDelta().addDeletes((DeleteFile) FileHelpers.writeDeleteFile(this.table, this.table.io().newOutputFile(new File(removePrefix(this.table.location() + "/data/deeply/nested/deletes.parquet")).toURI().toString()), Lists.newArrayList(new Pair[]{Pair.of(((DataFile) this.table.currentSnapshot().addedDataFiles(this.table.io()).iterator().next()).location(), 0L)})).first()).commit();
        Assertions.assertThat(spark.read().format("iceberg").load(this.table.location()).count()).isEqualTo(1L);
        RewriteTablePath.Result result = (RewriteTablePath.Result) actions().rewriteTablePath(this.table).stagingLocation(stagingLocation()).rewriteLocationPrefix(this.table.location(), targetTableLocation()).execute();
        checkFileNum(4, 3, 3, 13, result);
        copyTableFiles(result);
        Assertions.assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(1L);
    }

    @Test
    public void testPositionDeleteWithRow() throws Exception {
        String location = ((DataFile) this.table.currentSnapshot().addedDataFiles(this.table.io()).iterator().next()).location();
        ArrayList newArrayList = Lists.newArrayList();
        OutputFile newOutputFile = this.table.io().newOutputFile(new File(removePrefix(this.table.location() + "/data/deeply/nested/deletes.parquet")).toURI().toString());
        newArrayList.add(positionDelete(SCHEMA, location, 0L, 1, "AAAAAAAAAA", "AAAA"));
        this.table.newRowDelta().addDeletes(FileHelpers.writePosDeleteFile(this.table, newOutputFile, (StructLike) null, newArrayList)).commit();
        Assertions.assertThat(spark.read().format("iceberg").load(this.table.location()).count()).isEqualTo(1L);
        RewriteTablePath.Result result = (RewriteTablePath.Result) actions().rewriteTablePath(this.table).stagingLocation(stagingLocation()).rewriteLocationPrefix(this.table.location(), targetTableLocation()).execute();
        checkFileNum(4, 3, 3, 13, result);
        copyTableFiles(result);
        assertEquals("Position deletes should be equal", new Object[]{1, "AAAAAAAAAA", "AAAA"}, (Object[]) rows(targetTableLocation() + "#position_deletes").get(0)[2]);
        Assertions.assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(1L);
    }

    @Test
    public void testPositionDeletesAcrossFiles() throws Exception {
        List list = (List) StreamSupport.stream(this.table.snapshots().spliterator(), false).flatMap(snapshot -> {
            return StreamSupport.stream(snapshot.addedDataFiles(this.table.io()).spliterator(), false);
        }).map(dataFile -> {
            return Pair.of(dataFile.location(), 0L);
        }).collect(Collectors.toList());
        Assertions.assertThat(list.size()).isEqualTo(2);
        this.table.newRowDelta().addDeletes((DeleteFile) FileHelpers.writeDeleteFile(this.table, this.table.io().newOutputFile(new File(removePrefix(this.table.location() + "/data/deeply/nested/file.parquet")).toURI().toString()), list).first()).commit();
        Assertions.assertThat(spark.read().format("iceberg").load(this.table.location()).count()).isEqualTo(0L);
        RewriteTablePath.Result result = (RewriteTablePath.Result) actions().rewriteTablePath(this.table).stagingLocation(stagingLocation()).rewriteLocationPrefix(this.table.location(), targetTableLocation()).execute();
        checkFileNum(4, 3, 3, 13, result);
        copyTableFiles(result);
        Assertions.assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(0L);
    }

    @Test
    public void testEqualityDeletes() throws Exception {
        Table createTableWithSnapshots = createTableWithSnapshots(newTableLocation(), 1);
        spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA"), new ThreeColumnRecord(3, "BBBBBBBBBB", "BBBB"), new ThreeColumnRecord(4, "CCCCCCCCCC", "CCCC"), new ThreeColumnRecord(5, "DDDDDDDDDD", "DDDD")}), ThreeColumnRecord.class).coalesce(1).select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(newTableLocation());
        Schema select = createTableWithSnapshots.schema().select(new String[]{"c2"});
        GenericRecord create = GenericRecord.create(select);
        createTableWithSnapshots.newRowDelta().addDeletes(FileHelpers.writeDeleteFile(createTableWithSnapshots, createTableWithSnapshots.io().newOutputFile(new File(removePrefix(createTableWithSnapshots.location()) + "/data/deeply/nested/file.parquet").toURI().toString()), TestHelpers.Row.of(new Object[]{0}), Lists.newArrayList(new Record[]{create.copy("c2", "AAAAAAAAAA"), create.copy("c2", "CCCCCCCCCC")}), select)).commit();
        RewriteTablePath.Result result = (RewriteTablePath.Result) actions().rewriteTablePath(createTableWithSnapshots).stagingLocation(stagingLocation()).rewriteLocationPrefix(newTableLocation(), targetTableLocation()).execute();
        checkFileNum(4, 3, 3, 13, result);
        copyTableFiles(result);
        Assertions.assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(2L);
    }

    @Test
    public void testFullTableRewritePathWithDeletedVersionFiles() throws Exception {
        String newTableLocation = newTableLocation();
        Table createTableWithSnapshots = createTableWithSnapshots(newTableLocation, 2);
        Assertions.assertThat(((ExpireSnapshots.Result) actions().expireSnapshots(createTableWithSnapshots).expireSnapshotId(newStaticTable(newTableLocation + "metadata/v2.metadata.json", this.table.io()).currentSnapshot().snapshotId()).execute()).deletedManifestListsCount()).isEqualTo(1);
        Dataset coalesce = spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1);
        for (int i = 0; i < 100; i++) {
            coalesce.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(newTableLocation);
        }
        createTableWithSnapshots.refresh();
        checkFileNum(102 - 1, 102 - 1, 102, ((102 * 4) - 1) - 1, (RewriteTablePath.Result) actions().rewriteTablePath(createTableWithSnapshots).stagingLocation(stagingLocation()).rewriteLocationPrefix(newTableLocation, targetTableLocation()).execute());
    }

    @Test
    public void testRewritePathWithoutSnapshot() throws Exception {
        checkFileNum(1, 0, 0, 1, (RewriteTablePath.Result) actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.tableLocation, newTableLocation()).endVersion("v1.metadata.json").execute());
    }

    @Test
    public void testExpireSnapshotBeforeRewrite() throws Exception {
        actions().expireSnapshots(this.table).expireSnapshotId(this.table.currentSnapshot().parentId().longValue()).execute();
        checkFileNum(4, 1, 2, 9, (RewriteTablePath.Result) actions().rewriteTablePath(this.table).stagingLocation(stagingLocation()).rewriteLocationPrefix(this.table.location(), targetTableLocation()).execute());
    }

    @Test
    public void testRewritePathWithNonLiveEntry() throws Exception {
        Table createTableWithSnapshots = createTableWithSnapshots(newTableLocation(), 3, Maps.newHashMap(), "overwrite");
        Snapshot oldestAncestor = SnapshotUtil.oldestAncestor(createTableWithSnapshots);
        String format = String.format("%sdata/%s", targetTableLocation(), fileName(((DataFile) Iterables.getOnlyElement(createTableWithSnapshots.snapshot(oldestAncestor.snapshotId()).addedDataFiles(createTableWithSnapshots.io()))).location()));
        Assertions.assertThat((ExpireSnapshots.Result) actions().expireSnapshots(createTableWithSnapshots).expireSnapshotId(oldestAncestor.snapshotId()).execute()).as("Should deleted 1 data files in root snapshot", new Object[0]).extracting(new Function[]{(v0) -> {
            return v0.deletedManifestListsCount();
        }, (v0) -> {
            return v0.deletedManifestsCount();
        }, (v0) -> {
            return v0.deletedDataFilesCount();
        }}).contains(new Object[]{1L, 1L, 1L});
        RewriteTablePath.Result result = (RewriteTablePath.Result) actions().rewriteTablePath(createTableWithSnapshots).stagingLocation(stagingLocation()).rewriteLocationPrefix(createTableWithSnapshots.location(), targetTableLocation()).execute();
        checkFileNum(5, 2, 4, 13, result);
        copyTableFiles(result);
        Assertions.assertThat(spark.read().format("iceberg").load(targetTableLocation() + "#all_files").select("file_path", new String[0]).as(Encoders.STRING()).collectAsList()).hasSize(2).doesNotContain(new String[]{format});
        Assertions.assertThat(spark.read().format("iceberg").load(targetTableLocation() + "#all_entries").filter("status == 2").select("data_file.file_path", new String[0]).as(Encoders.STRING()).collectAsList()).contains(new String[]{format});
    }

    @Test
    public void testStartSnapshotWithoutValidSnapshot() throws Exception {
        actions().expireSnapshots(this.table).expireSnapshotId(this.table.currentSnapshot().parentId().longValue()).execute();
        Assertions.assertThat(this.table.snapshots()).hasSize(1);
        checkFileNum(2, 1, 1, 5, (RewriteTablePath.Result) actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.table.location(), targetTableLocation()).stagingLocation(stagingLocation()).startVersion("v2.metadata.json").execute());
    }

    @Test
    public void testMoveTheVersionExpireSnapshot() throws Exception {
        actions().expireSnapshots(this.table).expireSnapshotId(this.table.currentSnapshot().parentId().longValue()).execute();
        checkFileNum(1, 0, 0, 1, (RewriteTablePath.Result) actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.table.location(), targetTableLocation()).stagingLocation(stagingLocation()).startVersion("v3.metadata.json").execute());
    }

    @Test
    public void testMoveVersionWithInvalidSnapshots() {
        actions().expireSnapshots(this.table).expireSnapshotId(this.table.currentSnapshot().parentId().longValue()).execute();
        Assertions.assertThatThrownBy(() -> {
            actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.table.location(), newTableLocation()).stagingLocation(stagingLocation()).endVersion("v3.metadata.json").execute();
        }).isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("Unable to build the manifest files dataframe. The end version in use may contain invalid snapshots. Please choose an earlier version without invalid snapshots.");
    }

    @Test
    public void testRollBack() throws Exception {
        long snapshotId = this.table.currentSnapshot().snapshotId();
        this.table.manageSnapshots().setCurrentSnapshot(this.table.currentSnapshot().parentId().longValue()).commit();
        spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1).select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.table.location());
        this.table.refresh();
        this.table.manageSnapshots().setCurrentSnapshot(snapshotId).commit();
        checkFileNum(6, 3, 3, 15, (RewriteTablePath.Result) actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.table.location(), newTableLocation()).stagingLocation(stagingLocation()).execute());
    }

    @Test
    public void testWriteAuditPublish() throws Exception {
        this.table.updateProperties().set("write.wap.enabled", "true").commit();
        spark.conf().set("spark.wap.id", "1");
        spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1).select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.table.location());
        this.table.refresh();
        checkFileNum(5, 3, 3, 14, (RewriteTablePath.Result) actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.table.location(), newTableLocation()).stagingLocation(stagingLocation()).execute());
    }

    @Test
    public void testSchemaChange() throws Exception {
        this.table.updateSchema().addColumn("c4", Types.StringType.get()).commit();
        checkFileNum(4, 2, 2, 10, (RewriteTablePath.Result) actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.table.location(), newTableLocation()).stagingLocation(stagingLocation()).execute());
    }

    @Test
    public void testSnapshotIdInheritanceEnabled() throws Exception {
        String newTableLocation = newTableLocation();
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("compatibility.snapshot-id-inheritance.enabled", "true");
        checkFileNum(3, 2, 2, 9, (RewriteTablePath.Result) actions().rewriteTablePath(createTableWithSnapshots(newTableLocation, 2, newHashMap)).stagingLocation(stagingLocation()).rewriteLocationPrefix(newTableLocation, targetTableLocation()).execute());
    }

    @Test
    public void testMetadataCompression() throws Exception {
        String newTableLocation = newTableLocation();
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("write.metadata.compression-codec", "gzip");
        Table createTableWithSnapshots = createTableWithSnapshots(newTableLocation, 2, newHashMap);
        checkFileNum(2, 1, 1, 5, (RewriteTablePath.Result) actions().rewriteTablePath(createTableWithSnapshots).rewriteLocationPrefix(newTableLocation, targetTableLocation()).endVersion("v2.gz.metadata.json").execute());
        checkFileNum(2, 2, 2, 8, (RewriteTablePath.Result) actions().rewriteTablePath(createTableWithSnapshots).rewriteLocationPrefix(newTableLocation, targetTableLocation()).startVersion("v1.gz.metadata.json").execute());
    }

    @Test
    public void testInvalidArgs() {
        RewriteTablePath rewriteTablePath = actions().rewriteTablePath(this.table);
        Assertions.assertThatThrownBy(() -> {
            rewriteTablePath.rewriteLocationPrefix("", (String) null);
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("Source prefix('') cannot be empty");
        Assertions.assertThatThrownBy(() -> {
            rewriteTablePath.rewriteLocationPrefix((String) null, (String) null);
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("Source prefix('null') cannot be empty");
        Assertions.assertThatThrownBy(() -> {
            rewriteTablePath.stagingLocation("");
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("Staging location('') cannot be empty");
        Assertions.assertThatThrownBy(() -> {
            rewriteTablePath.stagingLocation((String) null);
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("Staging location('null') cannot be empty");
        Assertions.assertThatThrownBy(() -> {
            rewriteTablePath.startVersion((String) null);
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("Start version('null') cannot be empty");
        Assertions.assertThatThrownBy(() -> {
            rewriteTablePath.endVersion(" ");
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("End version(' ') cannot be empty");
        Assertions.assertThatThrownBy(() -> {
            rewriteTablePath.endVersion((String) null);
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("End version('null') cannot be empty");
    }

    @Test
    public void testPartitionStatisticFile() throws IOException {
        String newTableLocation = newTableLocation();
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("format-version", "2");
        Table createMetastoreTable = createMetastoreTable(newTableLocation, newHashMap, "default", "v2tblwithPartStats", 0);
        TableMetadata currentMetadata = currentMetadata(createMetastoreTable);
        TableMetadataParser.overwrite(TableMetadata.buildFrom(currentMetadata).setPartitionStatistics(ImmutableGenericPartitionStatisticsFile.builder().snapshotId(11L).path("/some/partition/stats/file.parquet").fileSizeInBytes(42L).build()).build(), createMetastoreTable.io().newOutputFile(currentMetadata.metadataFileLocation()));
        Assertions.assertThatThrownBy(() -> {
            actions().rewriteTablePath(createMetastoreTable).rewriteLocationPrefix(newTableLocation, targetTableLocation()).execute();
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("Partition statistics files are not supported yet");
    }

    @Test
    public void testTableWithManyStatisticFiles() throws IOException {
        String newTableLocation = newTableLocation();
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("format-version", "2");
        Table createMetastoreTable = createMetastoreTable(newTableLocation, newHashMap, "default", "v2tblwithmanystats", 0);
        for (int i = 0; i < 10; i++) {
            sql("insert into hive.default.%s values (%s, 'AAAAAAAAAA', 'AAAA')", "v2tblwithmanystats", Integer.valueOf(i));
            createMetastoreTable.refresh();
            actions().computeTableStats(createMetastoreTable).execute();
        }
        createMetastoreTable.refresh();
        Assertions.assertThat(createMetastoreTable.statisticsFiles().size()).isEqualTo(10);
        checkFileNum((10 * 2) + 1, 10, 10, 10, (10 * 6) + 1, (RewriteTablePath.Result) actions().rewriteTablePath(createMetastoreTable).rewriteLocationPrefix(newTableLocation, targetTableLocation()).execute());
    }

    @Test
    public void testMetadataCompressionWithMetastoreTable() throws Exception {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("write.metadata.compression-codec", "gzip");
        Table createMetastoreTable = createMetastoreTable(newTableLocation(), newHashMap, "default", "testMetadataCompression", 2);
        TableMetadata currentMetadata = currentMetadata(createMetastoreTable);
        checkFileNum(2, 1, 1, 5, (RewriteTablePath.Result) actions().rewriteTablePath(createMetastoreTable).rewriteLocationPrefix(newTableLocation(), targetTableLocation()).endVersion(fileName(((TableMetadata.MetadataLogEntry) currentMetadata.previousFiles().get(1)).file())).execute());
        checkFileNum(2, 2, 2, 8, (RewriteTablePath.Result) actions().rewriteTablePath(createMetastoreTable).rewriteLocationPrefix(newTableLocation(), targetTableLocation()).startVersion(fileName(((TableMetadata.MetadataLogEntry) currentMetadata.previousFiles().get(0)).file())).execute());
    }

    @Test
    public void testMetadataLocationChange() throws Exception {
        Table createMetastoreTable = createMetastoreTable(newTableLocation(), Maps.newHashMap(), "default", "tbl", 1);
        String metadataFileLocation = currentMetadata(createMetastoreTable).metadataFileLocation();
        createMetastoreTable.updateProperties().set("write.metadata.path", newTableLocation() + "new-metadata-dir").commit();
        spark.sql("insert into hive.default.tbl values (1, 'AAAAAAAAAA', 'AAAA')");
        createMetastoreTable.refresh();
        checkFileNum(4, 2, 2, 10, (RewriteTablePath.Result) actions().rewriteTablePath(createMetastoreTable).rewriteLocationPrefix(newTableLocation(), targetTableLocation()).execute());
        checkFileNum(2, 1, 1, 5, (RewriteTablePath.Result) actions().rewriteTablePath(createMetastoreTable).rewriteLocationPrefix(newTableLocation(), targetTableLocation()).endVersion(fileName(metadataFileLocation)).execute());
        checkFileNum(2, 1, 1, 5, (RewriteTablePath.Result) actions().rewriteTablePath(createMetastoreTable).rewriteLocationPrefix(newTableLocation(), targetTableLocation()).startVersion(fileName(metadataFileLocation)).execute());
    }

    @Test
    public void testDeleteFrom() throws Exception {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("format-version", "2");
        newHashMap.put("write.delete.mode", "merge-on-read");
        Table createMetastoreTable = createMetastoreTable(newTableLocation(), newHashMap, "default", "v2tbl", 0);
        spark.createDataFrame(Lists.newArrayList(new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"), new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA"), new ThreeColumnRecord(3, "AAAAAAAAAA", "AAAA")}), ThreeColumnRecord.class).coalesce(1).select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").saveAsTable("hive.default." + "v2tbl");
        createMetastoreTable.refresh();
        spark.sql(String.format("delete from hive.default.%s where c1 = 1", "v2tbl"));
        createMetastoreTable.refresh();
        List<Object[]> rowsToJava = rowsToJava(spark.read().format("iceberg").load("hive.default." + "v2tbl").sort("c1", new String[]{"c2", "c3"}).collectAsList());
        Assertions.assertThat(rowsToJava.size()).isEqualTo(2);
        RewriteTablePath.Result result = (RewriteTablePath.Result) actions().rewriteTablePath(createMetastoreTable).rewriteLocationPrefix(newTableLocation(), targetTableLocation()).execute();
        checkFileNum(3, 2, 2, 9, result);
        copyTableFiles(result);
        catalog.registerTable(TableIdentifier.of(new String[]{"default", "copiedV2Table"}), targetTableLocation() + "/metadata/" + fileName(currentMetadata(createMetastoreTable).metadataFileLocation()));
        assertEquals("Rows must match", rowsToJava, rowsToJava(spark.read().format("iceberg").load("hive.default." + "copiedV2Table").sort("c1", new String[]{"c2", "c3"}).collectAsList()));
    }

    @Test
    public void testKryoDeserializeBroadcastValues() {
        sparkContext.getConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        Broadcast tableBroadcast = actions().rewriteTablePath(this.table).tableBroadcast();
        removeBroadcastValuesFromLocalBlockManager(tableBroadcast.id());
        Assertions.assertThat(((Table) tableBroadcast.getValue()).uuid()).isEqualTo(this.table.uuid());
    }

    protected void checkFileNum(int i, int i2, int i3, int i4, RewriteTablePath.Result result) {
        checkFileNum(i, i2, i3, 0, i4, result);
    }

    protected void checkFileNum(int i, int i2, int i3, int i4, int i5, RewriteTablePath.Result result) {
        List collectAsList = spark.read().format("text").load(result.fileListLocation()).as(Encoders.STRING()).collectAsList();
        Predicate predicate = str -> {
            return str.endsWith("-m0.avro") || str.endsWith("-m1.avro");
        };
        Predicate predicate2 = str2 -> {
            return str2.contains("snap-") && str2.endsWith(".avro");
        };
        Assertions.assertThat(collectAsList.stream().filter(str3 -> {
            return str3.endsWith(".metadata.json");
        }).count()).as("Wrong rebuilt version file count", new Object[0]).isEqualTo(i);
        Assertions.assertThat(collectAsList.stream().filter(predicate2).count()).as("Wrong rebuilt Manifest list file count", new Object[0]).isEqualTo(i2);
        Assertions.assertThat(collectAsList.stream().filter(predicate).count()).as("Wrong rebuilt Manifest file file count", new Object[0]).isEqualTo(i3);
        Assertions.assertThat(collectAsList.stream().filter(str4 -> {
            return str4.endsWith(".stats");
        }).count()).withFailMessage("Wrong rebuilt Statistic file count", new Object[0]).isEqualTo(i4);
        Assertions.assertThat(collectAsList.size()).as("Wrong total file count", new Object[0]).isEqualTo(i5);
    }

    protected String newTableLocation() throws IOException {
        return toAbsolute(this.newTableDir);
    }

    protected String targetTableLocation() throws IOException {
        return toAbsolute(this.targetTableDir);
    }

    protected String stagingLocation() throws IOException {
        return toAbsolute(this.staging);
    }

    protected String toAbsolute(Path path) throws IOException {
        return path.toFile().toURI().toString();
    }

    private void copyTableFiles(RewriteTablePath.Result result) throws Exception {
        for (Tuple2<String, String> tuple2 : readPathPairList(result.fileListLocation())) {
            FileUtils.copyFile(new File(URI.create((String) tuple2._1())), new File(URI.create((String) tuple2._2())));
        }
    }

    private String removePrefix(String str) {
        return str.substring(str.lastIndexOf(":") + 1);
    }

    protected Table newStaticTable(String str, FileIO fileIO) {
        return new BaseTable(new StaticTableOperations(str, fileIO), str);
    }

    private List<Tuple2<String, String>> readPathPairList(String str) {
        Encoder tuple = Encoders.tuple(Encoders.STRING(), Encoders.STRING());
        return spark.read().format("csv").schema(tuple.schema()).load(str).as(tuple).collectAsList();
    }

    private Table createMetastoreTable(String str, Map<String, String> map, String str2, String str3, int i) {
        spark.conf().set("spark.sql.catalog.hive", SparkCatalog.class.getName());
        spark.conf().set("spark.sql.catalog.hive.type", "hive");
        spark.conf().set("spark.sql.catalog.hive.default-namespace", "default");
        spark.conf().set("spark.sql.catalog.hive.cache-enabled", "false");
        StringBuilder sb = new StringBuilder();
        map.forEach((str4, str5) -> {
            sb.append("'" + str4 + "'='" + str5 + "',");
        });
        String substring = sb.substring(0, sb.length() > 0 ? sb.length() - 1 : 0);
        sql("DROP TABLE IF EXISTS hive.%s.%s", str2, str3);
        if (substring.isEmpty()) {
            String format = String.format("CREATE TABLE hive.%s.%s (c1 bigint, c2 string, c3 string)", str2, str3);
            if (!str.isEmpty()) {
                format = String.format("%s USING iceberg LOCATION '%s'", format, str);
            }
            sql(format, new Object[0]);
        } else {
            String format2 = String.format("CREATE TABLE hive.%s.%s (c1 bigint, c2 string, c3 string)", str2, str3);
            if (!str.isEmpty()) {
                format2 = String.format("%s USING iceberg LOCATION '%s'", format2, str);
            }
            sql(String.format("%s TBLPROPERTIES (%s)", format2, substring), new Object[0]);
        }
        for (int i2 = 0; i2 < i; i2++) {
            sql("insert into hive.%s.%s values (%s, 'AAAAAAAAAA', 'AAAA')", str2, str3, Integer.valueOf(i2));
        }
        return catalog.loadTable(TableIdentifier.of(new String[]{str2, str3}));
    }

    private static String fileName(String str) {
        String str2 = str;
        int lastIndexOf = str.lastIndexOf(File.separator);
        if (lastIndexOf != -1) {
            str2 = str.substring(lastIndexOf + 1);
        }
        return str2;
    }

    private TableMetadata currentMetadata(Table table) {
        return ((HasTableOperations) table).operations().current();
    }

    private List<Object[]> rows(String str) {
        return rowsToJava(spark.read().format("iceberg").load(str).collectAsList());
    }

    private List<Object[]> rowsSorted(String str, String str2) {
        return rowsToJava(spark.read().format("iceberg").load(str).sort(str2, new String[0]).collectAsList());
    }

    private PositionDelete<GenericRecord> positionDelete(Schema schema, CharSequence charSequence, Long l, Object... objArr) {
        PositionDelete<GenericRecord> create = PositionDelete.create();
        GenericRecord create2 = GenericRecord.create(schema);
        for (int i = 0; i < objArr.length; i++) {
            create2.set(i, objArr[i]);
        }
        create.set(charSequence, l.longValue(), create2);
        return create;
    }

    private void removeBroadcastValuesFromLocalBlockManager(long j) {
        BroadcastBlockId broadcastBlockId = new BroadcastBlockId(j, "");
        SparkEnv sparkEnv = SparkEnv.get();
        sparkEnv.broadcastManager().cachedValues().clear();
        BlockManager blockManager = sparkEnv.blockManager();
        BlockInfoManager blockInfoManager = blockManager.blockInfoManager();
        blockInfoManager.lockForWriting(broadcastBlockId, true);
        blockInfoManager.removeBlock(broadcastBlockId);
        blockManager.memoryStore().remove(broadcastBlockId);
    }
}
