package org.apache.iceberg.spark.extensions;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.internal.SQLConf;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/iceberg/spark/extensions/TestUpdate.class */
public abstract class TestUpdate extends SparkRowLevelOperationsTestBase {
    public TestUpdate(String str, String str2, Map<String, String> map, String str3, boolean z, String str4, String str5) {
        super(str, str2, map, str3, z, str4, str5);
    }

    @BeforeClass
    public static void setupSparkConf() {
        spark.conf().set("spark.sql.shuffle.partitions", "4");
    }

    @After
    public void removeTables() {
        sql("DROP TABLE IF EXISTS %s", new Object[]{this.tableName});
        sql("DROP TABLE IF EXISTS updated_id", new Object[0]);
        sql("DROP TABLE IF EXISTS updated_dep", new Object[0]);
        sql("DROP TABLE IF EXISTS deleted_employee", new Object[0]);
    }

    @Test
    public void testCoalesceUpdate() {
        createAndInitTable("id INT, dep STRING");
        String[] strArr = new String[100];
        for (int i = 0; i < 100; i++) {
            strArr[i] = String.format("{ \"id\": %d, \"dep\": \"hr\" }", Integer.valueOf(i));
        }
        append(this.tableName, strArr);
        append(this.tableName, strArr);
        append(this.tableName, strArr);
        append(this.tableName, strArr);
        sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", new Object[]{this.tableName, tablePropsAsString(ImmutableMap.of("read.split.open-file-cost", String.valueOf(Integer.MAX_VALUE), "write.update.distribution-mode", DistributionMode.RANGE.modeName()))});
        createBranchIfNeeded();
        withSQLConf(ImmutableMap.of(SQLConf.SHUFFLE_PARTITIONS().key(), "200", SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true", SQLConf.COALESCE_PARTITIONS_ENABLED().key(), "true", SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "256MB"), () -> {
            Assertions.assertThat(executeAndKeepPlan("UPDATE %s SET id = -1 WHERE mod(id, 2) = 0", new Object[]{commitTarget()}).toString()).contains(new CharSequence[]{"REBALANCE_PARTITIONS_BY_COL"});
        });
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot latestSnapshot = SnapshotUtil.latestSnapshot(loadTable, this.branch);
        if (mode(loadTable) == RowLevelOperationMode.COPY_ON_WRITE) {
            validateProperty(latestSnapshot, "added-data-files", "1");
        } else {
            validateProperty(latestSnapshot, "added-delete-files", "1");
        }
        Assert.assertEquals("Row count must match", 200L, scalarSql("SELECT COUNT(*) FROM %s WHERE id = -1", new Object[]{commitTarget()}));
    }

    @Test
    public void testSkewUpdate() {
        createAndInitTable("id INT, dep STRING");
        sql("ALTER TABLE %s ADD PARTITION FIELD dep", new Object[]{this.tableName});
        String[] strArr = new String[100];
        for (int i = 0; i < 100; i++) {
            strArr[i] = String.format("{ \"id\": %d, \"dep\": \"hr\" }", Integer.valueOf(i));
        }
        append(this.tableName, strArr);
        append(this.tableName, strArr);
        append(this.tableName, strArr);
        append(this.tableName, strArr);
        sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", new Object[]{this.tableName, tablePropsAsString(ImmutableMap.of("read.split.open-file-cost", String.valueOf(Integer.MAX_VALUE), "write.update.distribution-mode", DistributionMode.HASH.modeName()))});
        createBranchIfNeeded();
        withSQLConf(ImmutableMap.of(SQLConf.SHUFFLE_PARTITIONS().key(), "2", SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true", SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true", SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"), () -> {
            Assertions.assertThat(executeAndKeepPlan("UPDATE %s SET id = -1 WHERE mod(id, 2) = 0", new Object[]{commitTarget()}).toString()).contains(new CharSequence[]{"REBALANCE_PARTITIONS_BY_COL"});
        });
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot latestSnapshot = SnapshotUtil.latestSnapshot(loadTable, this.branch);
        if (mode(loadTable) == RowLevelOperationMode.COPY_ON_WRITE) {
            validateProperty(latestSnapshot, "added-data-files", "4");
        } else {
            validateProperty(latestSnapshot, "added-delete-files", "4");
        }
        Assert.assertEquals("Row count must match", 200L, scalarSql("SELECT COUNT(*) FROM %s WHERE id = -1", new Object[]{commitTarget()}));
    }

    @Test
    public void testExplain() {
        createAndInitTable("id INT, dep STRING");
        sql("INSERT INTO TABLE %s VALUES (1, 'hr'), (2, 'hardware'), (null, 'hr')", new Object[]{this.tableName});
        createBranchIfNeeded();
        sql("EXPLAIN UPDATE %s SET dep = 'invalid' WHERE id <=> 1", new Object[]{commitTarget()});
        sql("EXPLAIN UPDATE %s SET dep = 'invalid' WHERE true", new Object[]{commitTarget()});
        Assert.assertEquals("Should have 1 snapshot", 1L, Iterables.size(this.validationCatalog.loadTable(this.tableIdent).snapshots()));
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "hr"}), row(new Object[]{2, "hardware"}), row(new Object[]{null, "hr"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{selectTarget()}));
    }

    @Test
    public void testUpdateEmptyTable() {
        Assume.assumeFalse("Custom branch does not exist for empty table", "test".equals(this.branch));
        createAndInitTable("id INT, dep STRING");
        sql("UPDATE %s SET dep = 'invalid' WHERE id IN (1)", new Object[]{commitTarget()});
        sql("UPDATE %s SET id = -1 WHERE dep = 'hr'", new Object[]{commitTarget()});
        Assert.assertEquals("Should have 2 snapshots", 2L, Iterables.size(this.validationCatalog.loadTable(this.tableIdent).snapshots()));
        assertEquals("Should have expected rows", ImmutableList.of(), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testUpdateNonExistingCustomBranch() {
        Assume.assumeTrue("Test only applicable to custom branch", "test".equals(this.branch));
        createAndInitTable("id INT, dep STRING");
        Assertions.assertThatThrownBy(() -> {
            sql("UPDATE %s SET dep = 'invalid' WHERE id IN (1)", new Object[]{commitTarget()});
        }).isInstanceOf(ValidationException.class).hasMessage("Cannot use branch (does not exist): test");
    }

    @Test
    public void testUpdateWithAlias() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"a\" }");
        sql("ALTER TABLE %s ADD PARTITION FIELD dep", new Object[]{this.tableName});
        sql("UPDATE %s AS t SET t.dep = 'invalid'", new Object[]{commitTarget()});
        Assert.assertEquals("Should have 2 snapshots", 2L, Iterables.size(this.validationCatalog.loadTable(this.tableIdent).snapshots()));
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "invalid"})), sql("SELECT * FROM %s", new Object[]{selectTarget()}));
    }

    @Test
    public void testUpdateAlignsAssignments() {
        createAndInitTable("id INT, c1 INT, c2 INT");
        sql("INSERT INTO TABLE %s VALUES (1, 11, 111), (2, 22, 222)", new Object[]{this.tableName});
        createBranchIfNeeded();
        sql("UPDATE %s SET `c2` = c2 - 2, c1 = `c1` - 1 WHERE id <=> 1", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, 10, 109}), row(new Object[]{2, 22, 222})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testUpdateWithUnsupportedPartitionPredicate() {
        createAndInitTable("id INT, dep STRING");
        sql("ALTER TABLE %s ADD PARTITION FIELD dep", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (1, 'software'), (2, 'hr')", new Object[]{this.tableName});
        createBranchIfNeeded();
        sql("UPDATE %s t SET `t`.`id` = -1 WHERE t.dep LIKE '%%r' ", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "hr"}), row(new Object[]{1, "software"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testUpdateWithDynamicFileFiltering() {
        createAndInitTable("id INT, dep STRING");
        sql("ALTER TABLE %s ADD PARTITION FIELD dep", new Object[]{this.tableName});
        append(this.tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n{ \"id\": 3, \"dep\": \"hr\" }");
        createBranchIfNeeded();
        append(commitTarget(), "{ \"id\": 1, \"dep\": \"hardware\" }\n{ \"id\": 2, \"dep\": \"hardware\" }");
        sql("UPDATE %s SET id = cast('-1' AS INT) WHERE id = 2", new Object[]{commitTarget()});
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Assert.assertEquals("Should have 3 snapshots", 3L, Iterables.size(loadTable.snapshots()));
        Snapshot latestSnapshot = SnapshotUtil.latestSnapshot(loadTable, this.branch);
        if (mode(loadTable) == RowLevelOperationMode.COPY_ON_WRITE) {
            validateCopyOnWrite(latestSnapshot, "1", "1", "1");
        } else {
            validateMergeOnRead(latestSnapshot, "1", "1", "1");
        }
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "hardware"}), row(new Object[]{1, "hardware"}), row(new Object[]{1, "hr"}), row(new Object[]{3, "hr"})), sql("SELECT * FROM %s ORDER BY id, dep", new Object[]{commitTarget()}));
    }

    @Test
    public void testUpdateNonExistingRecords() {
        createAndInitTable("id INT, dep STRING");
        sql("ALTER TABLE %s ADD PARTITION FIELD dep", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (1, 'hr'), (2, 'hardware'), (null, 'hr')", new Object[]{this.tableName});
        createBranchIfNeeded();
        sql("UPDATE %s SET id = -1 WHERE id > 10", new Object[]{commitTarget()});
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Assert.assertEquals("Should have 2 snapshots", 2L, Iterables.size(loadTable.snapshots()));
        Snapshot latestSnapshot = SnapshotUtil.latestSnapshot(loadTable, this.branch);
        if (mode(loadTable) == RowLevelOperationMode.COPY_ON_WRITE) {
            validateCopyOnWrite(latestSnapshot, "0", null, null);
        } else {
            validateMergeOnRead(latestSnapshot, "0", null, null);
        }
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "hr"}), row(new Object[]{2, "hardware"}), row(new Object[]{null, "hr"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{selectTarget()}));
    }

    @Test
    public void testUpdateWithoutCondition() {
        createAndInitTable("id INT, dep STRING");
        sql("ALTER TABLE %s ADD PARTITION FIELD dep", new Object[]{this.tableName});
        sql("ALTER TABLE %s WRITE DISTRIBUTED BY PARTITION", new Object[]{this.tableName});
        sql("INSERT INTO TABLE %s VALUES (1, 'hr')", new Object[]{this.tableName});
        createBranchIfNeeded();
        sql("INSERT INTO TABLE %s VALUES (2, 'hardware')", new Object[]{commitTarget()});
        sql("INSERT INTO TABLE %s VALUES (null, 'hr')", new Object[]{commitTarget()});
        withSQLConf(ImmutableMap.of(SQLConf.SHUFFLE_PARTITIONS().key(), "200"), () -> {
            sql("UPDATE %s SET id = -1", new Object[]{commitTarget()});
        });
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Assert.assertEquals("Should have 4 snapshots", 4L, Iterables.size(loadTable.snapshots()));
        Snapshot latestSnapshot = SnapshotUtil.latestSnapshot(loadTable, this.branch);
        Assert.assertEquals("Operation must match", "overwrite", latestSnapshot.operation());
        if (mode(loadTable) == RowLevelOperationMode.COPY_ON_WRITE) {
            Assert.assertEquals("Operation must match", "overwrite", latestSnapshot.operation());
            validateProperty(latestSnapshot, "changed-partition-count", "2");
            validateProperty(latestSnapshot, "deleted-data-files", "3");
            validateProperty(latestSnapshot, "added-data-files", (Set<String>) ImmutableSet.of("2", "3"));
        } else {
            validateMergeOnRead(latestSnapshot, "2", "2", "2");
        }
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "hardware"}), row(new Object[]{-1, "hr"}), row(new Object[]{-1, "hr"})), sql("SELECT * FROM %s ORDER BY dep ASC", new Object[]{selectTarget()}));
    }

    @Test
    public void testUpdateWithNullConditions() {
        createAndInitTable("id INT, dep STRING");
        append(this.tableName, "{ \"id\": 0, \"dep\": null }\n{ \"id\": 1, \"dep\": \"hr\" }\n{ \"id\": 2, \"dep\": \"hardware\" }");
        createBranchIfNeeded();
        sql("UPDATE %s SET id = -1 WHERE dep = NULL", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{0, null}), row(new Object[]{1, "hr"}), row(new Object[]{2, "hardware"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
        sql("UPDATE %s SET id = -1 WHERE dep = 'software'", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{0, null}), row(new Object[]{1, "hr"}), row(new Object[]{2, "hardware"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
        sql("UPDATE %s SET dep = 'invalid', id = -1 WHERE dep <=> NULL", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "invalid"}), row(new Object[]{1, "hr"}), row(new Object[]{2, "hardware"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testUpdateWithInAndNotInConditions() {
        createAndInitTable("id INT, dep STRING");
        append(this.tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n{ \"id\": 2, \"dep\": \"hardware\" }\n{ \"id\": null, \"dep\": \"hr\" }");
        createBranchIfNeeded();
        sql("UPDATE %s SET id = -1 WHERE id IN (1, null)", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "hr"}), row(new Object[]{2, "hardware"}), row(new Object[]{null, "hr"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{selectTarget()}));
        sql("UPDATE %s SET id = 100 WHERE id NOT IN (null, 1)", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "hr"}), row(new Object[]{2, "hardware"}), row(new Object[]{null, "hr"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{selectTarget()}));
        sql("UPDATE %s SET id = 100 WHERE id NOT IN (1, 10)", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{100, "hardware"}), row(new Object[]{100, "hr"}), row(new Object[]{null, "hr"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST, dep", new Object[]{selectTarget()}));
    }

    @Test
    public void testUpdateWithMultipleRowGroupsParquet() throws NoSuchTableException {
        Assume.assumeTrue(this.fileFormat.equalsIgnoreCase("parquet"));
        createAndInitTable("id INT, dep STRING");
        sql("ALTER TABLE %s ADD PARTITION FIELD dep", new Object[]{this.tableName});
        sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%d')", new Object[]{this.tableName, "write.parquet.row-group-size-bytes", 100});
        sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%d')", new Object[]{this.tableName, "read.split.target-size", 100});
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(200);
        for (int i = 1; i <= 200; i++) {
            newArrayListWithCapacity.add(Integer.valueOf(i));
        }
        spark.createDataset(newArrayListWithCapacity, Encoders.INT()).withColumnRenamed("value", "id").withColumn("dep", functions.lit("hr")).coalesce(1).writeTo(this.tableName).append();
        createBranchIfNeeded();
        Assert.assertEquals(200L, spark.table(commitTarget()).count());
        sql("UPDATE %s SET id = -1 WHERE id IN (200, 201)", new Object[]{commitTarget()});
        Assert.assertEquals(200L, spark.table(commitTarget()).count());
    }

    @Test
    public void testUpdateNestedStructFields() {
        createAndInitTable("id INT, s STRUCT<c1:INT,c2:STRUCT<a:ARRAY<INT>,m:MAP<STRING, STRING>>>", "{ \"id\": 1, \"s\": { \"c1\": 2, \"c2\": { \"a\": [1,2], \"m\": { \"a\": \"b\"} } } } }");
        sql("UPDATE %s SET s.c1 = -1, s.c2.m = map('k', 'v'), s.c2.a = array(-1)", new Object[]{commitTarget()});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, row(new Object[]{-1, row(new Object[]{ImmutableList.of(-1), ImmutableMap.of("k", "v")})})})), sql("SELECT * FROM %s", new Object[]{selectTarget()}));
        sql("UPDATE %s SET s.c1 = NULL, s.c2 = NULL WHERE id IN (1)", new Object[]{commitTarget()});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, row(new Object[]{null, null})})), sql("SELECT * FROM %s", new Object[]{selectTarget()}));
        sql("UPDATE %s SET s = named_struct('c1', 1, 'c2', named_struct('a', array(1), 'm', null))", new Object[]{commitTarget()});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, row(new Object[]{1, row(new Object[]{ImmutableList.of(1), null})})})), sql("SELECT * FROM %s", new Object[]{selectTarget()}));
    }

    @Test
    public void testUpdateWithUserDefinedDistribution() {
        createAndInitTable("id INT, c2 INT, c3 INT");
        sql("ALTER TABLE %s ADD PARTITION FIELD bucket(8, c3)", new Object[]{this.tableName});
        append(this.tableName, "{ \"id\": 1, \"c2\": 11, \"c3\": 1 }\n{ \"id\": 2, \"c2\": 22, \"c3\": 1 }\n{ \"id\": 3, \"c2\": 33, \"c3\": 1 }");
        createBranchIfNeeded();
        sql("ALTER TABLE %s WRITE ORDERED BY c2", new Object[]{this.tableName});
        sql("UPDATE %s SET c2 = -22 WHERE id NOT IN (1, 3)", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, 11, 1}), row(new Object[]{2, -22, 1}), row(new Object[]{3, 33, 1})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{selectTarget()}));
        sql("ALTER TABLE %s WRITE LOCALLY ORDERED BY id", new Object[]{this.tableName});
        sql("UPDATE %s SET c2 = -33 WHERE id = 3", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, 11, 1}), row(new Object[]{2, -22, 1}), row(new Object[]{3, -33, 1})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{selectTarget()}));
        sql("ALTER TABLE %s WRITE DISTRIBUTED BY PARTITION ORDERED BY id", new Object[]{this.tableName});
        sql("UPDATE %s SET c2 = -11 WHERE id = 1", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, -11, 1}), row(new Object[]{2, -22, 1}), row(new Object[]{3, -33, 1})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{selectTarget()}));
    }

    @Test
    public synchronized void testUpdateWithSerializableIsolation() throws InterruptedException {
        Assume.assumeFalse(this.catalogName.equalsIgnoreCase("testhadoop"));
        createAndInitTable("id INT, dep STRING");
        sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", new Object[]{this.tableName, "write.update.isolation-level", "serializable"});
        sql("INSERT INTO TABLE %s VALUES (1, 'hr')", new Object[]{this.tableName});
        createBranchIfNeeded();
        ExecutorService exitingExecutorService = MoreExecutors.getExitingExecutorService((ThreadPoolExecutor) Executors.newFixedThreadPool(2));
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        Future<?> submit = exitingExecutorService.submit(() -> {
            for (int i = 0; i < Integer.MAX_VALUE; i++) {
                while (atomicInteger.get() < i * 2) {
                    sleep(10L);
                }
                sql("UPDATE %s SET id = -1 WHERE id = 1", new Object[]{commitTarget()});
                atomicInteger.incrementAndGet();
            }
        });
        Future<?> submit2 = exitingExecutorService.submit(() -> {
            Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
            GenericRecord create = GenericRecord.create(SnapshotUtil.schemaFor(loadTable, this.branch));
            create.set(0, 1);
            create.set(1, "hr");
            for (int i = 0; i < Integer.MAX_VALUE; i++) {
                while (atomicBoolean.get() && atomicInteger.get() < i * 2) {
                    sleep(10L);
                }
                if (!atomicBoolean.get()) {
                    return;
                }
                for (int i2 = 0; i2 < 5; i2++) {
                    AppendFiles appendFile = loadTable.newFastAppend().appendFile(writeDataFile(loadTable, ImmutableList.of(create)));
                    if (this.branch != null) {
                        appendFile.toBranch(this.branch);
                    }
                    appendFile.commit();
                    sleep(10L);
                }
                atomicInteger.incrementAndGet();
            }
        });
        try {
            Objects.requireNonNull(submit);
            Assertions.assertThatThrownBy(submit::get).isInstanceOf(ExecutionException.class).cause().isInstanceOf(ValidationException.class).hasMessageContaining("Found conflicting files that can contain");
            atomicBoolean.set(false);
            submit2.cancel(true);
            exitingExecutorService.shutdown();
            Assert.assertTrue("Timeout", exitingExecutorService.awaitTermination(2L, TimeUnit.MINUTES));
        } catch (Throwable th) {
            atomicBoolean.set(false);
            submit2.cancel(true);
            throw th;
        }
    }

    @Test
    public synchronized void testUpdateWithSnapshotIsolation() throws InterruptedException, ExecutionException {
        Assume.assumeFalse(this.catalogName.equalsIgnoreCase("testhadoop"));
        createAndInitTable("id INT, dep STRING");
        sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", new Object[]{this.tableName, "write.update.isolation-level", "snapshot"});
        sql("INSERT INTO TABLE %s VALUES (1, 'hr')", new Object[]{this.tableName});
        createBranchIfNeeded();
        ExecutorService exitingExecutorService = MoreExecutors.getExitingExecutorService((ThreadPoolExecutor) Executors.newFixedThreadPool(2));
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        Future<?> submit = exitingExecutorService.submit(() -> {
            for (int i = 0; i < 20; i++) {
                while (atomicInteger.get() < i * 2) {
                    sleep(10L);
                }
                sql("UPDATE %s SET id = -1 WHERE id = 1", new Object[]{this.tableName});
                atomicInteger.incrementAndGet();
            }
        });
        Future<?> submit2 = exitingExecutorService.submit(() -> {
            Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
            GenericRecord create = GenericRecord.create(SnapshotUtil.schemaFor(loadTable, this.branch));
            create.set(0, 1);
            create.set(1, "hr");
            for (int i = 0; i < 20; i++) {
                while (atomicBoolean.get() && atomicInteger.get() < i * 2) {
                    sleep(10L);
                }
                if (!atomicBoolean.get()) {
                    return;
                }
                for (int i2 = 0; i2 < 5; i2++) {
                    AppendFiles appendFile = loadTable.newFastAppend().appendFile(writeDataFile(loadTable, ImmutableList.of(create)));
                    if (this.branch != null) {
                        appendFile.toBranch(this.branch);
                    }
                    appendFile.commit();
                    sleep(10L);
                }
                atomicInteger.incrementAndGet();
            }
        });
        try {
            submit.get();
            atomicBoolean.set(false);
            submit2.cancel(true);
            exitingExecutorService.shutdown();
            Assert.assertTrue("Timeout", exitingExecutorService.awaitTermination(2L, TimeUnit.MINUTES));
        } catch (Throwable th) {
            atomicBoolean.set(false);
            submit2.cancel(true);
            throw th;
        }
    }

    @Test
    public void testUpdateWithInferredCasts() {
        createAndInitTable("id INT, s STRING", "{ \"id\": 1, \"s\": \"value\" }");
        sql("UPDATE %s SET s = -1 WHERE id = 1", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "-1"})), sql("SELECT * FROM %s", new Object[]{selectTarget()}));
    }

    @Test
    public void testUpdateModifiesNullStruct() {
        createAndInitTable("id INT, s STRUCT<n1:INT,n2:INT>", "{ \"id\": 1, \"s\": null }");
        sql("UPDATE %s SET s.n1 = -1 WHERE id = 1", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, row(new Object[]{-1, null})})), sql("SELECT * FROM %s", new Object[]{selectTarget()}));
    }

    @Test
    public void testUpdateRefreshesRelationCache() {
        createAndInitTable("id INT, dep STRING");
        sql("ALTER TABLE %s ADD PARTITION FIELD dep", new Object[]{this.tableName});
        append(this.tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n{ \"id\": 3, \"dep\": \"hr\" }");
        createBranchIfNeeded();
        append(commitTarget(), "{ \"id\": 1, \"dep\": \"hardware\" }\n{ \"id\": 2, \"dep\": \"hardware\" }");
        spark.sql("SELECT * FROM " + commitTarget() + " WHERE id = 1").createOrReplaceTempView("tmp");
        spark.sql("CACHE TABLE tmp");
        assertEquals("View should have correct data", ImmutableList.of(row(new Object[]{1, "hardware"}), row(new Object[]{1, "hr"})), sql("SELECT * FROM tmp ORDER BY id, dep", new Object[0]));
        sql("UPDATE %s SET id = -1 WHERE id = 1", new Object[]{commitTarget()});
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Assert.assertEquals("Should have 3 snapshots", 3L, Iterables.size(loadTable.snapshots()));
        Snapshot latestSnapshot = SnapshotUtil.latestSnapshot(loadTable, this.branch);
        if (mode(loadTable) == RowLevelOperationMode.COPY_ON_WRITE) {
            validateCopyOnWrite(latestSnapshot, "2", "2", "2");
        } else {
            validateMergeOnRead(latestSnapshot, "2", "2", "2");
        }
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "hardware"}), row(new Object[]{-1, "hr"}), row(new Object[]{2, "hardware"}), row(new Object[]{3, "hr"})), sql("SELECT * FROM %s ORDER BY id, dep", new Object[]{commitTarget()}));
        assertEquals("Should refresh the relation cache", ImmutableList.of(), sql("SELECT * FROM tmp ORDER BY id, dep", new Object[0]));
        spark.sql("UNCACHE TABLE tmp");
    }

    @Test
    public void testUpdateWithInSubquery() {
        createAndInitTable("id INT, dep STRING");
        append(this.tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n{ \"id\": 2, \"dep\": \"hardware\" }\n{ \"id\": null, \"dep\": \"hr\" }");
        createBranchIfNeeded();
        createOrReplaceView("updated_id", Arrays.asList(0, 1, null), Encoders.INT());
        createOrReplaceView("updated_dep", Arrays.asList("software", "hr"), Encoders.STRING());
        sql("UPDATE %s SET id = -1 WHERE id IN (SELECT * FROM updated_id) AND dep IN (SELECT * from updated_dep)", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "hr"}), row(new Object[]{2, "hardware"}), row(new Object[]{null, "hr"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{selectTarget()}));
        sql("UPDATE %s SET id = 5 WHERE id IS NULL OR id IN (SELECT value + 1 FROM updated_id)", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "hr"}), row(new Object[]{5, "hardware"}), row(new Object[]{5, "hr"})), sql("SELECT * FROM %s ORDER BY id, dep", new Object[]{selectTarget()}));
        append(commitTarget(), "{ \"id\": null, \"dep\": \"hr\" }\n{ \"id\": 2, \"dep\": \"hr\" }");
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "hr"}), row(new Object[]{2, "hr"}), row(new Object[]{5, "hardware"}), row(new Object[]{5, "hr"}), row(new Object[]{null, "hr"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST, dep", new Object[]{selectTarget()}));
        sql("UPDATE %s SET id = 10 WHERE id IN (SELECT value + 2 FROM updated_id) AND dep = 'hr'", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "hr"}), row(new Object[]{5, "hardware"}), row(new Object[]{5, "hr"}), row(new Object[]{10, "hr"}), row(new Object[]{null, "hr"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST, dep", new Object[]{selectTarget()}));
    }

    @Test
    public void testUpdateWithInSubqueryAndDynamicFileFiltering() {
        createAndInitTable("id INT, dep STRING");
        sql("ALTER TABLE %s ADD PARTITION FIELD dep", new Object[]{this.tableName});
        sql("ALTER TABLE %s WRITE DISTRIBUTED BY PARTITION", new Object[]{this.tableName});
        append(this.tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n{ \"id\": 3, \"dep\": \"hr\" }");
        createBranchIfNeeded();
        append(commitTarget(), "{ \"id\": 1, \"dep\": \"hardware\" }\n{ \"id\": 2, \"dep\": \"hardware\" }");
        createOrReplaceView("updated_id", Arrays.asList(-1, 2), Encoders.INT());
        sql("UPDATE %s SET id = -1 WHERE id IN (SELECT * FROM updated_id)", new Object[]{commitTarget()});
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Assert.assertEquals("Should have 3 snapshots", 3L, Iterables.size(loadTable.snapshots()));
        Snapshot latestSnapshot = SnapshotUtil.latestSnapshot(loadTable, this.branch);
        if (mode(loadTable) == RowLevelOperationMode.COPY_ON_WRITE) {
            validateCopyOnWrite(latestSnapshot, "1", "1", "1");
        } else {
            validateMergeOnRead(latestSnapshot, "1", "1", "1");
        }
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "hardware"}), row(new Object[]{1, "hardware"}), row(new Object[]{1, "hr"}), row(new Object[]{3, "hr"})), sql("SELECT * FROM %s ORDER BY id, dep", new Object[]{commitTarget()}));
    }

    @Test
    public void testUpdateWithSelfSubquery() {
        createAndInitTable("id INT, dep STRING");
        append(this.tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n{ \"id\": 2, \"dep\": \"hr\" }");
        createBranchIfNeeded();
        sql("UPDATE %s SET dep = 'x' WHERE id IN (SELECT id + 1 FROM %s)", new Object[]{commitTarget(), commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "hr"}), row(new Object[]{2, "x"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
        withSQLConf(ImmutableMap.of(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "false"), () -> {
            sql("UPDATE %s SET dep = 'y' WHERE id = (SELECT count(*) FROM (SELECT DISTINCT id FROM %s) AS t)", new Object[]{commitTarget(), commitTarget()});
            assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "hr"}), row(new Object[]{2, "y"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
        });
        sql("UPDATE %s SET id = (SELECT id - 2 FROM %s WHERE id = 1)", new Object[]{commitTarget(), commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "hr"}), row(new Object[]{-1, "y"})), sql("SELECT * FROM %s ORDER BY id, dep", new Object[]{selectTarget()}));
    }

    @Test
    public void testUpdateWithMultiColumnInSubquery() {
        createAndInitTable("id INT, dep STRING");
        append(this.tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n{ \"id\": 2, \"dep\": \"hardware\" }\n{ \"id\": null, \"dep\": \"hr\" }");
        createBranchIfNeeded();
        createOrReplaceView("deleted_employee", Arrays.asList(new Employee(null, "hr"), new Employee(1, "hr")), Encoders.bean(Employee.class));
        sql("UPDATE %s SET dep = 'x', id = -1 WHERE (id, dep) IN (SELECT id, dep FROM deleted_employee)", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "x"}), row(new Object[]{2, "hardware"}), row(new Object[]{null, "hr"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{selectTarget()}));
    }

    @Test
    public void testUpdateWithNotInSubquery() {
        createAndInitTable("id INT, dep STRING");
        append(this.tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n{ \"id\": 2, \"dep\": \"hardware\" }\n{ \"id\": null, \"dep\": \"hr\" }");
        createBranchIfNeeded();
        createOrReplaceView("updated_id", Arrays.asList(-1, -2, null), Encoders.INT());
        createOrReplaceView("updated_dep", Arrays.asList("software", "hr"), Encoders.STRING());
        sql("UPDATE %s SET id = -1 WHERE id NOT IN (SELECT * FROM updated_id)", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "hr"}), row(new Object[]{2, "hardware"}), row(new Object[]{null, "hr"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{selectTarget()}));
        sql("UPDATE %s SET id = -1 WHERE id NOT IN (SELECT * FROM updated_id WHERE value IS NOT NULL)", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "hardware"}), row(new Object[]{-1, "hr"}), row(new Object[]{null, "hr"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST, dep", new Object[]{selectTarget()}));
        sql("UPDATE %s SET id = 5 WHERE id NOT IN (SELECT * FROM updated_id) OR dep IN ('software', 'hr')", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "hardware"}), row(new Object[]{5, "hr"}), row(new Object[]{5, "hr"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST, dep", new Object[]{selectTarget()}));
    }

    @Test
    public void testUpdateWithExistSubquery() {
        createAndInitTable("id INT, dep STRING");
        append(this.tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n{ \"id\": 2, \"dep\": \"hardware\" }\n{ \"id\": null, \"dep\": \"hr\" }");
        createBranchIfNeeded();
        createOrReplaceView("updated_id", Arrays.asList(-1, -2, null), Encoders.INT());
        createOrReplaceView("updated_dep", Arrays.asList("hr", null), Encoders.STRING());
        sql("UPDATE %s t SET id = -1 WHERE EXISTS (SELECT 1 FROM updated_id u WHERE t.id = u.value)", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "hr"}), row(new Object[]{2, "hardware"}), row(new Object[]{null, "hr"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{selectTarget()}));
        sql("UPDATE %s t SET dep = 'x', id = -1 WHERE EXISTS (SELECT 1 FROM updated_id u WHERE t.id = u.value + 2)", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "x"}), row(new Object[]{2, "hardware"}), row(new Object[]{null, "hr"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{selectTarget()}));
        sql("UPDATE %s t SET id = -2 WHERE EXISTS (SELECT 1 FROM updated_id u WHERE t.id = u.value) OR t.id IS NULL", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-2, "hr"}), row(new Object[]{-2, "x"}), row(new Object[]{2, "hardware"})), sql("SELECT * FROM %s ORDER BY id, dep", new Object[]{selectTarget()}));
        sql("UPDATE %s t SET id = 1 WHERE EXISTS (SELECT 1 FROM updated_id ui WHERE t.id = ui.value) AND EXISTS (SELECT 1 FROM updated_dep ud WHERE t.dep = ud.value)", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-2, "x"}), row(new Object[]{1, "hr"}), row(new Object[]{2, "hardware"})), sql("SELECT * FROM %s ORDER BY id, dep", new Object[]{selectTarget()}));
    }

    @Test
    public void testUpdateWithNotExistsSubquery() {
        createAndInitTable("id INT, dep STRING");
        append(this.tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n{ \"id\": 2, \"dep\": \"hardware\" }\n{ \"id\": null, \"dep\": \"hr\" }");
        createBranchIfNeeded();
        createOrReplaceView("updated_id", Arrays.asList(-1, -2, null), Encoders.INT());
        createOrReplaceView("updated_dep", Arrays.asList("hr", "software"), Encoders.STRING());
        sql("UPDATE %s t SET id = -1 WHERE NOT EXISTS (SELECT 1 FROM updated_id u WHERE t.id = u.value + 2)", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "hardware"}), row(new Object[]{-1, "hr"}), row(new Object[]{1, "hr"})), sql("SELECT * FROM %s ORDER BY id, dep", new Object[]{selectTarget()}));
        sql("UPDATE %s t SET id = 5 WHERE NOT EXISTS (SELECT 1 FROM updated_id u WHERE t.id = u.value) OR t.id = 1", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "hardware"}), row(new Object[]{-1, "hr"}), row(new Object[]{5, "hr"})), sql("SELECT * FROM %s ORDER BY id, dep", new Object[]{selectTarget()}));
        sql("UPDATE %s t SET id = 10 WHERE NOT EXISTS (SELECT 1 FROM updated_id ui WHERE t.id = ui.value) AND EXISTS (SELECT 1 FROM updated_dep ud WHERE t.dep = ud.value)", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "hardware"}), row(new Object[]{-1, "hr"}), row(new Object[]{10, "hr"})), sql("SELECT * FROM %s ORDER BY id, dep", new Object[]{selectTarget()}));
    }

    @Test
    public void testUpdateWithScalarSubquery() {
        createAndInitTable("id INT, dep STRING");
        append(this.tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n{ \"id\": 2, \"dep\": \"hardware\" }\n{ \"id\": null, \"dep\": \"hr\" }");
        createBranchIfNeeded();
        createOrReplaceView("updated_id", Arrays.asList(1, 100, null), Encoders.INT());
        withSQLConf(ImmutableMap.of(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "false"), () -> {
            sql("UPDATE %s SET id = -1 WHERE id <= (SELECT min(value) FROM updated_id)", new Object[]{commitTarget()});
            assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "hr"}), row(new Object[]{2, "hardware"}), row(new Object[]{null, "hr"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{selectTarget()}));
        });
    }

    @Test
    public void testUpdateThatRequiresGroupingBeforeWrite() {
        createAndInitTable("id INT, dep STRING");
        sql("ALTER TABLE %s ADD PARTITION FIELD dep", new Object[]{this.tableName});
        append(this.tableName, "{ \"id\": 0, \"dep\": \"hr\" }\n{ \"id\": 1, \"dep\": \"hr\" }\n{ \"id\": 2, \"dep\": \"hr\" }");
        createBranchIfNeeded();
        append(commitTarget(), "{ \"id\": 0, \"dep\": \"ops\" }\n{ \"id\": 1, \"dep\": \"ops\" }\n{ \"id\": 2, \"dep\": \"ops\" }");
        append(commitTarget(), "{ \"id\": 0, \"dep\": \"hr\" }\n{ \"id\": 1, \"dep\": \"hr\" }\n{ \"id\": 2, \"dep\": \"hr\" }");
        append(commitTarget(), "{ \"id\": 0, \"dep\": \"ops\" }\n{ \"id\": 1, \"dep\": \"ops\" }\n{ \"id\": 2, \"dep\": \"ops\" }");
        createOrReplaceView("updated_id", Arrays.asList(1, 100), Encoders.INT());
        String str = spark.conf().get("spark.sql.shuffle.partitions");
        try {
            spark.conf().set("spark.sql.shuffle.partitions", "1");
            sql("UPDATE %s t SET id = -1 WHERE id IN (SELECT * FROM updated_id)", new Object[]{commitTarget()});
            Assert.assertEquals("Should have expected num of rows", 12L, spark.table(commitTarget()).count());
            spark.conf().set("spark.sql.shuffle.partitions", str);
        } catch (Throwable th) {
            spark.conf().set("spark.sql.shuffle.partitions", str);
            throw th;
        }
    }

    @Test
    public void testUpdateWithVectorization() {
        createAndInitTable("id INT, dep STRING");
        append(this.tableName, "{ \"id\": 0, \"dep\": \"hr\" }\n{ \"id\": 1, \"dep\": \"hr\" }\n{ \"id\": 2, \"dep\": \"hr\" }");
        createBranchIfNeeded();
        withSQLConf(ImmutableMap.of("spark.sql.iceberg.vectorization.enabled", "true"), () -> {
            sql("UPDATE %s t SET id = -1", new Object[]{commitTarget()});
            assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "hr"}), row(new Object[]{-1, "hr"}), row(new Object[]{-1, "hr"})), sql("SELECT * FROM %s ORDER BY id, dep", new Object[]{selectTarget()}));
        });
    }

    @Test
    public void testUpdateModifyPartitionSourceField() throws NoSuchTableException {
        createAndInitTable("id INT, dep STRING, country STRING");
        sql("ALTER TABLE %s ADD PARTITION FIELD bucket(4, id)", new Object[]{this.tableName});
        sql("ALTER TABLE %s ADD PARTITION FIELD dep", new Object[]{this.tableName});
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(100);
        for (int i = 1; i <= 100; i++) {
            newArrayListWithCapacity.add(Integer.valueOf(i));
        }
        spark.createDataset(newArrayListWithCapacity, Encoders.INT()).withColumnRenamed("value", "id").withColumn("dep", functions.lit("hr")).withColumn("country", functions.lit("usa")).coalesce(1).writeTo(this.tableName).append();
        createBranchIfNeeded();
        spark.createDataset(newArrayListWithCapacity, Encoders.INT()).withColumnRenamed("value", "id").withColumn("dep", functions.lit("software")).withColumn("country", functions.lit("usa")).coalesce(1).writeTo(commitTarget()).append();
        spark.createDataset(newArrayListWithCapacity, Encoders.INT()).withColumnRenamed("value", "id").withColumn("dep", functions.lit("hardware")).withColumn("country", functions.lit("usa")).coalesce(1).writeTo(commitTarget()).append();
        sql("UPDATE %s SET id = -1 WHERE id IN (10, 11, 12, 13, 14, 15, 16, 17, 18, 19)", new Object[]{commitTarget()});
        Assert.assertEquals(30L, scalarSql("SELECT count(*) FROM %s WHERE id = -1", new Object[]{selectTarget()}));
    }

    @Test
    public void testUpdateWithStaticPredicatePushdown() {
        createAndInitTable("id INT, dep STRING");
        sql("ALTER TABLE %s ADD PARTITION FIELD dep", new Object[]{this.tableName});
        append(this.tableName, "{ \"id\": 1, \"dep\": \"software\" }");
        createBranchIfNeeded();
        append(commitTarget(), "{ \"id\": 1, \"dep\": \"hr\" }");
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot latestSnapshot = SnapshotUtil.latestSnapshot(loadTable, this.branch);
        Assert.assertEquals("Must have 2 files before UPDATE", "2", (String) latestSnapshot.summary().get("total-data-files"));
        loadTable.io().deleteFile(((DataFile) Iterables.getOnlyElement(latestSnapshot.addedDataFiles(loadTable.io()))).path().toString());
        withSQLConf(ImmutableMap.of(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED().key(), "false"), () -> {
            sql("UPDATE %s SET id = -1 WHERE dep IN ('software') AND id == 1", new Object[]{commitTarget()});
        });
    }

    @Test
    public void testUpdateWithInvalidUpdates() {
        createAndInitTable("id INT, a ARRAY<STRUCT<c1:INT,c2:INT>>, m MAP<STRING,STRING>", "{ \"id\": 0, \"a\": null, \"m\": null }");
        AssertHelpers.assertThrows("Should complain about updating an array column", AnalysisException.class, "Updating nested fields is only supported for structs", () -> {
            return sql("UPDATE %s SET a.c1 = 1", new Object[]{commitTarget()});
        });
        AssertHelpers.assertThrows("Should complain about updating a map column", AnalysisException.class, "Updating nested fields is only supported for structs", () -> {
            return sql("UPDATE %s SET m.key = 'new_key'", new Object[]{commitTarget()});
        });
    }

    @Test
    public void testUpdateWithConflictingAssignments() {
        createAndInitTable("id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>", "{ \"id\": 0, \"s\": null }");
        AssertHelpers.assertThrows("Should complain about conflicting updates to a top-level column", AnalysisException.class, "Updates are in conflict", () -> {
            return sql("UPDATE %s t SET t.id = 1, t.c.n1 = 2, t.id = 2", new Object[]{commitTarget()});
        });
        AssertHelpers.assertThrows("Should complain about conflicting updates to a nested column", AnalysisException.class, "Updates are in conflict for these columns", () -> {
            return sql("UPDATE %s t SET t.c.n1 = 1, t.id = 2, t.c.n1 = 2", new Object[]{commitTarget()});
        });
        AssertHelpers.assertThrows("Should complain about conflicting updates to a nested column", AnalysisException.class, "Updates are in conflict", () -> {
            sql("UPDATE %s SET c.n1 = 1, c = named_struct('n1', 1, 'n2', named_struct('dn1', 1, 'dn2', 2))", new Object[]{commitTarget()});
        });
    }

    @Test
    public void testUpdateWithInvalidAssignments() {
        createAndInitTable("id INT NOT NULL, s STRUCT<n1:INT NOT NULL,n2:STRUCT<dn1:INT,dn2:INT>> NOT NULL", "{ \"id\": 0, \"s\": { \"n1\": 1, \"n2\": { \"dn1\": 3, \"dn2\": 4 } } }");
        for (String str : new String[]{"ansi", "strict"}) {
            withSQLConf(ImmutableMap.of("spark.sql.storeAssignmentPolicy", str), () -> {
                AssertHelpers.assertThrows("Should complain about writing nulls to a top-level column", AnalysisException.class, "Cannot write nullable values to non-null column", () -> {
                    return sql("UPDATE %s t SET t.id = NULL", new Object[]{commitTarget()});
                });
                AssertHelpers.assertThrows("Should complain about writing nulls to a nested column", AnalysisException.class, "Cannot write nullable values to non-null column", () -> {
                    return sql("UPDATE %s t SET t.s.n1 = NULL", new Object[]{commitTarget()});
                });
                AssertHelpers.assertThrows("Should complain about writing missing fields in structs", AnalysisException.class, "missing fields", () -> {
                    return sql("UPDATE %s t SET t.s = named_struct('n1', 1)", new Object[]{commitTarget()});
                });
                AssertHelpers.assertThrows("Should complain about writing invalid data types", AnalysisException.class, "Cannot safely cast", () -> {
                    return sql("UPDATE %s t SET t.s.n1 = 'str'", new Object[]{commitTarget()});
                });
                AssertHelpers.assertThrows("Should complain about writing incompatible structs", AnalysisException.class, "field name does not match", () -> {
                    return sql("UPDATE %s t SET t.s.n2 = named_struct('dn2', 1, 'dn1', 2)", new Object[]{commitTarget()});
                });
            });
        }
    }

    @Test
    public void testUpdateWithNonDeterministicCondition() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"hr\" }");
        AssertHelpers.assertThrows("Should complain about non-deterministic expressions", AnalysisException.class, "nondeterministic expressions are only allowed", () -> {
            return sql("UPDATE %s SET id = -1 WHERE id = 1 AND rand() > 0.5", new Object[]{commitTarget()});
        });
    }

    @Test
    public void testUpdateOnNonIcebergTableNotSupported() {
        createOrReplaceView("testtable", "{ \"c1\": -100, \"c2\": -200 }");
        AssertHelpers.assertThrows("UPDATE is not supported for non iceberg table", UnsupportedOperationException.class, "not supported temporarily", () -> {
            return sql("UPDATE %s SET c1 = -1 WHERE c2 = 1", new Object[]{"testtable"});
        });
    }

    @Test
    public void testUpdateToWAPBranch() {
        Assume.assumeTrue("WAP branch only works for table identifier without branch", this.branch == null);
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"hr\" }\n{ \"id\": 2, \"dep\": \"a\" }");
        sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')", new Object[]{this.tableName, "write.wap.enabled"});
        withSQLConf(ImmutableMap.of("spark.wap.branch", "wap"), () -> {
            sql("UPDATE %s SET dep='hr' WHERE dep='a'", new Object[]{this.tableName});
            Assert.assertEquals("Should have expected num of rows when reading table", 2L, sql("SELECT * FROM %s WHERE dep='hr'", new Object[]{this.tableName}).size());
            Assert.assertEquals("Should have expected num of rows when reading WAP branch", 2L, sql("SELECT * FROM %s.branch_wap WHERE dep='hr'", new Object[]{this.tableName}).size());
            Assert.assertEquals("Should not modify main branch", 1L, sql("SELECT * FROM %s.branch_main WHERE dep='hr'", new Object[]{this.tableName}).size());
        });
        withSQLConf(ImmutableMap.of("spark.wap.branch", "wap"), () -> {
            sql("UPDATE %s SET dep='b' WHERE dep='hr'", new Object[]{this.tableName});
            Assert.assertEquals("Should have expected num of rows when reading table with multiple writes", 2L, sql("SELECT * FROM %s WHERE dep='b'", new Object[]{this.tableName}).size());
            Assert.assertEquals("Should have expected num of rows when reading WAP branch with multiple writes", 2L, sql("SELECT * FROM %s.branch_wap WHERE dep='b'", new Object[]{this.tableName}).size());
            Assert.assertEquals("Should not modify main branch with multiple writes", 0L, sql("SELECT * FROM %s.branch_main WHERE dep='b'", new Object[]{this.tableName}).size());
        });
    }

    @Test
    public void testUpdateToWapBranchWithTableBranchIdentifier() {
        Assume.assumeTrue("Test must have branch name part in table identifier", this.branch != null);
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"hr\" }");
        sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')", new Object[]{this.tableName, "write.wap.enabled"});
        withSQLConf(ImmutableMap.of("spark.wap.branch", "wap"), () -> {
            Assertions.assertThatThrownBy(() -> {
                sql("UPDATE %s SET dep='hr' WHERE dep='a'", new Object[]{commitTarget()});
            }).isInstanceOf(ValidationException.class).hasMessage(String.format("Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [wap]", this.branch));
        });
    }

    private RowLevelOperationMode mode(Table table) {
        return RowLevelOperationMode.fromName((String) table.properties().getOrDefault("write.update.mode", TableProperties.UPDATE_MODE_DEFAULT));
    }
}
