package org.apache.iceberg.spark.extensions;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.spark.SparkException;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.functions;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/iceberg/spark/extensions/TestMerge.class */
public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
    public TestMerge(String str, String str2, Map<String, String> map, String str3, boolean z, String str4) {
        super(str, str2, map, str3, z, str4);
    }

    @BeforeClass
    public static void setupSparkConf() {
        spark.conf().set("spark.sql.shuffle.partitions", "4");
    }

    @After
    public void removeTables() {
        sql("DROP TABLE IF EXISTS %s", new Object[]{this.tableName});
        sql("DROP TABLE IF EXISTS source", new Object[0]);
    }

    @Test
    public void testMergeIntoEmptyTargetInsertAllNonMatchingRows() {
        createAndInitTable("id INT, dep STRING");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 3, \"dep\": \"emp-id-3\" }");
        sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN NOT MATCHED THEN   INSERT *", new Object[]{this.tableName});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "emp-id-1"}), row(new Object[]{2, "emp-id-2"}), row(new Object[]{3, "emp-id-3"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeIntoEmptyTargetInsertOnlyMatchingRows() {
        createAndInitTable("id INT, dep STRING");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 3, \"dep\": \"emp-id-3\" }");
        sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN NOT MATCHED AND (s.id >=2) THEN   INSERT *", new Object[]{this.tableName});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{2, "emp-id-2"}), row(new Object[]{3, "emp-id-3"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeWithOnlyUpdateClause() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-six\" }");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET *", new Object[]{this.tableName});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "emp-id-1"}), row(new Object[]{6, "emp-id-six"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeWithOnlyDeleteClause() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 6 THEN   DELETE", new Object[]{this.tableName});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "emp-id-one"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeWithAllCauses() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET * WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT *", new Object[]{this.tableName});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "emp-id-1"}), row(new Object[]{2, "emp-id-2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeWithAllCausesWithExplicitColumnSpecification() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET t.id = s.id, t.dep = s.dep WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT (t.id, t.dep) VALUES (s.id, s.dep)", new Object[]{this.tableName});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "emp-id-1"}), row(new Object[]{2, "emp-id-2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeWithSourceCTE() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-two\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-3\" }\n{ \"id\": 1, \"dep\": \"emp-id-2\" }\n{ \"id\": 5, \"dep\": \"emp-id-6\" }");
        sql("WITH cte1 AS (SELECT id + 1 AS id, dep FROM source) MERGE INTO %s AS t USING cte1 AS s ON t.id == s.id WHEN MATCHED AND t.id = 2 THEN   UPDATE SET * WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.id = 3 THEN   INSERT *", new Object[]{this.tableName});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{2, "emp-id-2"}), row(new Object[]{3, "emp-id-3"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeWithSourceFromSetOps() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        sql("MERGE INTO %s AS t USING (%s) AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET * WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT *", new Object[]{this.tableName, "SELECT * FROM source WHERE id = 2 UNION ALL SELECT * FROM source WHERE id = 1 OR id = 6"});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "emp-id-1"}), row(new Object[]{2, "emp-id-2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeWithMultipleUpdatesForTargetRow() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        AssertHelpers.assertThrows("Should complain non iceberg target table", SparkException.class, "a single row from the target table with multiple rows of the source table", () -> {
            sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET * WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT *", new Object[]{this.tableName});
        });
        assertEquals("Target should be unchanged", ImmutableList.of(row(new Object[]{1, "emp-id-one"}), row(new Object[]{6, "emp-id-6"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeWithDisabledCardinalityCheck() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        try {
            sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%b')", new Object[]{this.tableName, "write.merge.cardinality-check.enabled", false});
            sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET * WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT *", new Object[]{this.tableName});
            sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%b')", new Object[]{this.tableName, "write.merge.cardinality-check.enabled", true});
            assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "emp-id-1"}), row(new Object[]{1, "emp-id-1"}), row(new Object[]{2, "emp-id-2"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{this.tableName}));
        } catch (Throwable th) {
            sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%b')", new Object[]{this.tableName, "write.merge.cardinality-check.enabled", true});
            throw th;
        }
    }

    @Test
    public void testMergeWithUnconditionalDelete() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT *", new Object[]{this.tableName});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{2, "emp-id-2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeWithSingleConditionalDelete() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        AssertHelpers.assertThrows("Should complain non iceberg target table", SparkException.class, "a single row from the target table with multiple rows of the source table", () -> {
            sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT *", new Object[]{this.tableName});
        });
        assertEquals("Target should be unchanged", ImmutableList.of(row(new Object[]{1, "emp-id-one"}), row(new Object[]{6, "emp-id-6"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeWithIdentityTransform() {
        for (DistributionMode distributionMode : DistributionMode.values()) {
            createAndInitTable("id INT, dep STRING");
            sql("ALTER TABLE %s ADD PARTITION FIELD identity(dep)", new Object[]{this.tableName});
            sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", new Object[]{this.tableName, "write.distribution-mode", distributionMode.modeName()});
            append(this.tableName, "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
            createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
            sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET * WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT *", new Object[]{this.tableName});
            assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "emp-id-1"}), row(new Object[]{2, "emp-id-2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
            removeTables();
        }
    }

    @Test
    public void testMergeWithDaysTransform() {
        for (DistributionMode distributionMode : DistributionMode.values()) {
            createAndInitTable("id INT, ts TIMESTAMP");
            sql("ALTER TABLE %s ADD PARTITION FIELD days(ts)", new Object[]{this.tableName});
            sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", new Object[]{this.tableName, "write.distribution-mode", distributionMode.modeName()});
            append(this.tableName, "id INT, ts TIMESTAMP", "{ \"id\": 1, \"ts\": \"2000-01-01 00:00:00\" }\n{ \"id\": 6, \"ts\": \"2000-01-06 00:00:00\" }");
            createOrReplaceView("source", "id INT, ts TIMESTAMP", "{ \"id\": 2, \"ts\": \"2001-01-02 00:00:00\" }\n{ \"id\": 1, \"ts\": \"2001-01-01 00:00:00\" }\n{ \"id\": 6, \"ts\": \"2001-01-06 00:00:00\" }");
            sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET * WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT *", new Object[]{this.tableName});
            assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "2001-01-01 00:00:00"}), row(new Object[]{2, "2001-01-02 00:00:00"})), sql("SELECT id, CAST(ts AS STRING) FROM %s ORDER BY id", new Object[]{this.tableName}));
            removeTables();
        }
    }

    @Test
    public void testMergeWithBucketTransform() {
        for (DistributionMode distributionMode : DistributionMode.values()) {
            createAndInitTable("id INT, dep STRING");
            sql("ALTER TABLE %s ADD PARTITION FIELD bucket(2, dep)", new Object[]{this.tableName});
            sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", new Object[]{this.tableName, "write.distribution-mode", distributionMode.modeName()});
            append(this.tableName, "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
            createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
            sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET * WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT *", new Object[]{this.tableName});
            assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "emp-id-1"}), row(new Object[]{2, "emp-id-2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
            removeTables();
        }
    }

    @Test
    public void testMergeWithTruncateTransform() {
        for (DistributionMode distributionMode : DistributionMode.values()) {
            createAndInitTable("id INT, dep STRING");
            sql("ALTER TABLE %s ADD PARTITION FIELD truncate(dep, 2)", new Object[]{this.tableName});
            sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", new Object[]{this.tableName, "write.distribution-mode", distributionMode.modeName()});
            append(this.tableName, "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
            createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
            sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET * WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT *", new Object[]{this.tableName});
            assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "emp-id-1"}), row(new Object[]{2, "emp-id-2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
            removeTables();
        }
    }

    @Test
    public void testMergeIntoPartitionedAndOrderedTable() {
        for (DistributionMode distributionMode : DistributionMode.values()) {
            createAndInitTable("id INT, dep STRING");
            sql("ALTER TABLE %s ADD PARTITION FIELD dep", new Object[]{this.tableName});
            sql("ALTER TABLE %s WRITE ORDERED BY (id)", new Object[]{this.tableName});
            sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", new Object[]{this.tableName, "write.distribution-mode", distributionMode.modeName()});
            append(this.tableName, "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
            createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
            sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET * WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT *", new Object[]{this.tableName});
            assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "emp-id-1"}), row(new Object[]{2, "emp-id-2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
            removeTables();
        }
    }

    @Test
    public void testSelfMerge() {
        createAndInitTable("id INT, v STRING", "{ \"id\": 1, \"v\": \"v1\" }\n{ \"id\": 2, \"v\": \"v2\" }");
        sql("MERGE INTO %s t USING %s s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET v = 'x' WHEN NOT MATCHED THEN   INSERT *", new Object[]{this.tableName, this.tableName});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, "x"}), row(new Object[]{2, "v2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeWithSourceAsSelfSubquery() {
        createAndInitTable("id INT, v STRING", "{ \"id\": 1, \"v\": \"v1\" }\n{ \"id\": 2, \"v\": \"v2\" }");
        createOrReplaceView("source", Arrays.asList(1, null), Encoders.INT());
        sql("MERGE INTO %s t USING (SELECT id AS value FROM %s r JOIN source ON r.id = source.value) s ON t.id == s.value WHEN MATCHED AND t.id = 1 THEN   UPDATE SET v = 'x' WHEN NOT MATCHED THEN   INSERT (v, id) VALUES ('invalid', -1) ", new Object[]{this.tableName, this.tableName});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, "x"}), row(new Object[]{2, "v2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
    }

    @Test
    public synchronized void testMergeWithSerializableIsolation() throws InterruptedException {
        Assume.assumeFalse(this.catalogName.equalsIgnoreCase("testhadoop"));
        createAndInitTable("id INT, dep STRING");
        createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
        sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", new Object[]{this.tableName, "write.merge.isolation-level", "serializable"});
        ExecutorService exitingExecutorService = MoreExecutors.getExitingExecutorService((ThreadPoolExecutor) Executors.newFixedThreadPool(2));
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Future<?> submit = exitingExecutorService.submit(() -> {
            for (int i = 0; i < Integer.MAX_VALUE; i++) {
                while (atomicInteger.get() < i * 2) {
                    sleep(10L);
                }
                sql("MERGE INTO %s t USING source s ON t.id == s.value WHEN MATCHED THEN   UPDATE SET dep = 'x'", new Object[]{this.tableName});
                atomicInteger.incrementAndGet();
            }
        });
        Future<?> submit2 = exitingExecutorService.submit(() -> {
            for (int i = 0; i < Integer.MAX_VALUE; i++) {
                while (atomicInteger.get() < i * 2) {
                    sleep(10L);
                }
                sql("INSERT INTO TABLE %s VALUES (1, 'hr')", new Object[]{this.tableName});
                atomicInteger.incrementAndGet();
            }
        });
        try {
            try {
                submit.get();
                Assert.fail("Expected a validation exception");
                submit2.cancel(true);
            } catch (ExecutionException e) {
                Throwable cause = e.getCause();
                Assert.assertThat(cause, CoreMatchers.instanceOf(SparkException.class));
                Throwable cause2 = cause.getCause();
                Assert.assertThat(cause2, CoreMatchers.instanceOf(ValidationException.class));
                Assert.assertThat(cause2.getMessage(), CoreMatchers.containsString("Found conflicting files that can contain"));
                submit2.cancel(true);
            }
            exitingExecutorService.shutdown();
            Assert.assertTrue("Timeout", exitingExecutorService.awaitTermination(2L, TimeUnit.MINUTES));
        } catch (Throwable th) {
            submit2.cancel(true);
            throw th;
        }
    }

    @Test
    public synchronized void testMergeWithSnapshotIsolation() throws InterruptedException, ExecutionException {
        Assume.assumeFalse(this.catalogName.equalsIgnoreCase("testhadoop"));
        createAndInitTable("id INT, dep STRING");
        createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
        sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", new Object[]{this.tableName, "write.merge.isolation-level", "snapshot"});
        ExecutorService exitingExecutorService = MoreExecutors.getExitingExecutorService((ThreadPoolExecutor) Executors.newFixedThreadPool(2));
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Future<?> submit = exitingExecutorService.submit(() -> {
            for (int i = 0; i < 20; i++) {
                while (atomicInteger.get() < i * 2) {
                    sleep(10L);
                }
                sql("MERGE INTO %s t USING source s ON t.id == s.value WHEN MATCHED THEN   UPDATE SET dep = 'x'", new Object[]{this.tableName});
                atomicInteger.incrementAndGet();
            }
        });
        Future<?> submit2 = exitingExecutorService.submit(() -> {
            for (int i = 0; i < 20; i++) {
                while (atomicInteger.get() < i * 2) {
                    sleep(10L);
                }
                sql("INSERT INTO TABLE %s VALUES (1, 'hr')", new Object[]{this.tableName});
                atomicInteger.incrementAndGet();
            }
        });
        try {
            submit.get();
            submit2.cancel(true);
            exitingExecutorService.shutdown();
            Assert.assertTrue("Timeout", exitingExecutorService.awaitTermination(2L, TimeUnit.MINUTES));
        } catch (Throwable th) {
            submit2.cancel(true);
            throw th;
        }
    }

    @Test
    public void testMergeWithExtraColumnsInSource() {
        createAndInitTable("id INT, v STRING", "{ \"id\": 1, \"v\": \"v1\" }\n{ \"id\": 2, \"v\": \"v2\" }");
        createOrReplaceView("source", "{ \"id\": 1, \"extra_col\": -1, \"v\": \"v1_1\" }\n{ \"id\": 3, \"extra_col\": -1, \"v\": \"v3\" }\n{ \"id\": 4, \"extra_col\": -1, \"v\": \"v4\" }");
        sql("MERGE INTO %s t USING source ON t.id == source.id WHEN MATCHED THEN   UPDATE SET v = source.v WHEN NOT MATCHED THEN   INSERT (v, id) VALUES (source.v, source.id)", new Object[]{this.tableName});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, "v1_1"}), row(new Object[]{2, "v2"}), row(new Object[]{3, "v3"}), row(new Object[]{4, "v4"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeWithNullsInTargetAndSource() {
        createAndInitTable("id INT, v STRING", "{ \"id\": null, \"v\": \"v1\" }\n{ \"id\": 2, \"v\": \"v2\" }");
        createOrReplaceView("source", "{ \"id\": null, \"v\": \"v1_1\" }\n{ \"id\": 4, \"v\": \"v4\" }");
        sql("MERGE INTO %s t USING source ON t.id == source.id WHEN MATCHED THEN   UPDATE SET v = source.v WHEN NOT MATCHED THEN   INSERT (v, id) VALUES (source.v, source.id)", new Object[]{this.tableName});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{null, "v1"}), row(new Object[]{null, "v1_1"}), row(new Object[]{2, "v2"}), row(new Object[]{4, "v4"})), sql("SELECT * FROM %s ORDER BY v", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeWithNullSafeEquals() {
        createAndInitTable("id INT, v STRING", "{ \"id\": null, \"v\": \"v1\" }\n{ \"id\": 2, \"v\": \"v2\" }");
        createOrReplaceView("source", "{ \"id\": null, \"v\": \"v1_1\" }\n{ \"id\": 4, \"v\": \"v4\" }");
        sql("MERGE INTO %s t USING source ON t.id <=> source.id WHEN MATCHED THEN   UPDATE SET v = source.v WHEN NOT MATCHED THEN   INSERT (v, id) VALUES (source.v, source.id)", new Object[]{this.tableName});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{null, "v1_1"}), row(new Object[]{2, "v2"}), row(new Object[]{4, "v4"})), sql("SELECT * FROM %s ORDER BY v", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeWithNullCondition() {
        createAndInitTable("id INT, v STRING", "{ \"id\": null, \"v\": \"v1\" }\n{ \"id\": 2, \"v\": \"v2\" }");
        createOrReplaceView("source", "{ \"id\": null, \"v\": \"v1_1\" }\n{ \"id\": 2, \"v\": \"v2_2\" }");
        sql("MERGE INTO %s t USING source ON t.id == source.id AND NULL WHEN MATCHED THEN   UPDATE SET v = source.v WHEN NOT MATCHED THEN   INSERT (v, id) VALUES (source.v, source.id)", new Object[]{this.tableName});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{null, "v1"}), row(new Object[]{null, "v1_1"}), row(new Object[]{2, "v2"}), row(new Object[]{2, "v2_2"})), sql("SELECT * FROM %s ORDER BY v", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeWithNullActionConditions() {
        createAndInitTable("id INT, v STRING", "{ \"id\": 1, \"v\": \"v1\" }\n{ \"id\": 2, \"v\": \"v2\" }");
        createOrReplaceView("source", "{ \"id\": 1, \"v\": \"v1_1\" }\n{ \"id\": 2, \"v\": \"v2_2\" }\n{ \"id\": 3, \"v\": \"v3_3\" }");
        sql("MERGE INTO %s t USING source ON t.id == source.id WHEN MATCHED AND source.id = 1 AND NULL THEN   UPDATE SET v = source.v WHEN MATCHED AND source.v = 'v1_1' AND NULL THEN   DELETE WHEN NOT MATCHED AND source.id = 3 AND NULL THEN   INSERT (v, id) VALUES (source.v, source.id)", new Object[]{this.tableName});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, "v1"}), row(new Object[]{2, "v2"})), sql("SELECT * FROM %s ORDER BY v", new Object[]{this.tableName}));
        sql("MERGE INTO %s t USING source ON t.id == source.id WHEN MATCHED AND source.id = 1 AND NULL THEN   UPDATE SET v = source.v WHEN MATCHED AND source.v = 'v1_1' THEN   DELETE WHEN NOT MATCHED AND source.id = 3 AND NULL THEN   INSERT (v, id) VALUES (source.v, source.id)", new Object[]{this.tableName});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{2, "v2"})), sql("SELECT * FROM %s ORDER BY v", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeWithMultipleMatchingActions() {
        createAndInitTable("id INT, v STRING", "{ \"id\": 1, \"v\": \"v1\" }\n{ \"id\": 2, \"v\": \"v2\" }");
        createOrReplaceView("source", "{ \"id\": 1, \"v\": \"v1_1\" }\n{ \"id\": 2, \"v\": \"v2_2\" }");
        sql("MERGE INTO %s t USING source ON t.id == source.id WHEN MATCHED AND source.id = 1 THEN   UPDATE SET v = source.v WHEN MATCHED AND source.v = 'v1_1' THEN   DELETE WHEN NOT MATCHED THEN   INSERT (v, id) VALUES (source.v, source.id)", new Object[]{this.tableName});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, "v1_1"}), row(new Object[]{2, "v2"})), sql("SELECT * FROM %s ORDER BY v", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeWithMultipleRowGroupsParquet() throws NoSuchTableException {
        Assume.assumeTrue(this.fileFormat.equalsIgnoreCase("parquet"));
        createAndInitTable("id INT, dep STRING");
        sql("ALTER TABLE %s ADD PARTITION FIELD dep", new Object[]{this.tableName});
        sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%d')", new Object[]{this.tableName, "write.parquet.row-group-size-bytes", 100});
        sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%d')", new Object[]{this.tableName, "read.split.target-size", 100});
        createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 1; i <= 200; i++) {
            newArrayList.add(Integer.valueOf(i));
        }
        spark.createDataset(newArrayList, Encoders.INT()).withColumnRenamed("value", "id").withColumn("dep", functions.lit("hr")).coalesce(1).writeTo(this.tableName).append();
        Assert.assertEquals(200L, spark.table(this.tableName).count());
        sql("MERGE INTO %s t USING source ON t.id == source.value WHEN MATCHED THEN   UPDATE SET dep = 'x'", new Object[]{this.tableName});
        Assert.assertEquals(200L, spark.table(this.tableName).count());
    }

    @Test
    public void testMergeInsertOnly() {
        createAndInitTable("id STRING, v STRING", "{ \"id\": \"a\", \"v\": \"v1\" }\n{ \"id\": \"b\", \"v\": \"v2\" }");
        createOrReplaceView("source", "{ \"id\": \"a\", \"v\": \"v1_1\" }\n{ \"id\": \"a\", \"v\": \"v1_2\" }\n{ \"id\": \"c\", \"v\": \"v3\" }\n{ \"id\": \"d\", \"v\": \"v4_1\" }\n{ \"id\": \"d\", \"v\": \"v4_2\" }");
        sql("MERGE INTO %s t USING source ON t.id == source.id WHEN NOT MATCHED THEN   INSERT *", new Object[]{this.tableName});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{"a", "v1"}), row(new Object[]{"b", "v2"}), row(new Object[]{"c", "v3"}), row(new Object[]{"d", "v4_1"}), row(new Object[]{"d", "v4_2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeInsertOnlyWithCondition() {
        createAndInitTable("id INTEGER, v INTEGER", "{ \"id\": 1, \"v\": 1 }");
        createOrReplaceView("source", "{ \"id\": 1, \"v\": 11, \"is_new\": true }\n{ \"id\": 2, \"v\": 21, \"is_new\": true }\n{ \"id\": 2, \"v\": 22, \"is_new\": false }");
        sql("MERGE INTO %s t USING source s ON t.id == s.id WHEN NOT MATCHED AND is_new = TRUE THEN   INSERT (v, id) VALUES (s.v + 100, s.id)", new Object[]{this.tableName});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, 1}), row(new Object[]{2, 121})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeAlignsUpdateAndInsertActions() {
        createAndInitTable("id INT, a INT, b STRING", "{ \"id\": 1, \"a\": 2, \"b\": \"str\" }");
        createOrReplaceView("source", "{ \"id\": 1, \"c1\": -2, \"c2\": \"new_str_1\" }\n{ \"id\": 2, \"c1\": -20, \"c2\": \"new_str_2\" }");
        sql("MERGE INTO %s t USING source ON t.id == source.id WHEN MATCHED THEN   UPDATE SET b = c2, a = c1, t.id = source.id WHEN NOT MATCHED THEN   INSERT (b, a, id) VALUES (c2, c1, id)", new Object[]{this.tableName});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, -2, "new_str_1"}), row(new Object[]{2, -20, "new_str_2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeUpdatesNestedStructFields() {
        createAndInitTable("id INT, s STRUCT<c1:INT,c2:STRUCT<a:ARRAY<INT>,m:MAP<STRING, STRING>>>", "{ \"id\": 1, \"s\": { \"c1\": 2, \"c2\": { \"a\": [1,2], \"m\": { \"a\": \"b\"} } } } }");
        createOrReplaceView("source", "{ \"id\": 1, \"c1\": -2 }");
        sql("MERGE INTO %s t USING source ON t.id == source.id WHEN MATCHED THEN   UPDATE SET t.s.c1 = source.c1, t.s.c2.a = array(-1, -2), t.s.c2.m = map('k', 'v')", new Object[]{this.tableName});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, row(new Object[]{-2, row(new Object[]{ImmutableList.of(-1, -2), ImmutableMap.of("k", "v")})})})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
        sql("MERGE INTO %s t USING source ON t.id == source.id WHEN MATCHED THEN   UPDATE SET t.s.c1 = NULL, t.s.c2 = NULL", new Object[]{this.tableName});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, row(new Object[]{null, null})})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
        sql("MERGE INTO %s t USING source ON t.id == source.id WHEN MATCHED THEN   UPDATE SET t.s = named_struct('c1', 100, 'c2', named_struct('a', array(1), 'm', map('x', 'y')))", new Object[]{this.tableName});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, row(new Object[]{100, row(new Object[]{ImmutableList.of(1), ImmutableMap.of("x", "y")})})})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeWithInferredCasts() {
        createAndInitTable("id INT, s STRING", "{ \"id\": 1, \"s\": \"value\" }");
        createOrReplaceView("source", "{ \"id\": 1, \"c1\": -2}");
        sql("MERGE INTO %s t USING source ON t.id == source.id WHEN MATCHED THEN   UPDATE SET t.s = source.c1", new Object[]{this.tableName});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, "-2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeModifiesNullStruct() {
        createAndInitTable("id INT, s STRUCT<n1:INT,n2:INT>", "{ \"id\": 1, \"s\": null }");
        createOrReplaceView("source", "{ \"id\": 1, \"n1\": -10 }");
        sql("MERGE INTO %s t USING source s ON t.id == s.id WHEN MATCHED THEN   UPDATE SET t.s.n1 = s.n1", new Object[]{this.tableName});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, row(new Object[]{-10, null})})), sql("SELECT * FROM %s", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeRefreshesRelationCache() {
        createAndInitTable("id INT, name STRING", "{ \"id\": 1, \"name\": \"n1\" }");
        createOrReplaceView("source", "{ \"id\": 1, \"name\": \"n2\" }");
        spark.sql("SELECT name FROM " + this.tableName).createOrReplaceTempView("tmp");
        spark.sql("CACHE TABLE tmp");
        assertEquals("View should have correct data", ImmutableList.of(row(new Object[]{"n1"})), sql("SELECT * FROM tmp", new Object[0]));
        sql("MERGE INTO %s t USING source s ON t.id == s.id WHEN MATCHED THEN   UPDATE SET t.name = s.name", new Object[]{this.tableName});
        assertEquals("View should have correct data", ImmutableList.of(row(new Object[]{"n2"})), sql("SELECT * FROM tmp", new Object[0]));
        spark.sql("UNCACHE TABLE tmp");
    }

    @Test
    public void testMergeWithNonExistingColumns() {
        createAndInitTable("id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>");
        createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
        AssertHelpers.assertThrows("Should complain about the invalid top-level column", AnalysisException.class, "cannot resolve '`t.invalid_col`'", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.invalid_col = s.c2", new Object[]{this.tableName});
        });
        AssertHelpers.assertThrows("Should complain about the invalid nested column", AnalysisException.class, "No such struct field invalid_col", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.c.n2.invalid_col = s.c2", new Object[]{this.tableName});
        });
        AssertHelpers.assertThrows("Should complain about the invalid top-level column", AnalysisException.class, "cannot resolve '`invalid_col`'", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.c.n2.dn1 = s.c2 WHEN NOT MATCHED THEN   INSERT (id, invalid_col) VALUES (s.c1, null)", new Object[]{this.tableName});
        });
    }

    @Test
    public void testMergeWithInvalidColumnsInInsert() {
        createAndInitTable("id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>");
        createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
        AssertHelpers.assertThrows("Should complain about the nested column", AnalysisException.class, "Nested fields are not supported inside INSERT clauses", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.c.n2.dn1 = s.c2 WHEN NOT MATCHED THEN   INSERT (id, c.n2) VALUES (s.c1, null)", new Object[]{this.tableName});
        });
        AssertHelpers.assertThrows("Should complain about duplicate columns", AnalysisException.class, "Duplicate column names inside INSERT clause", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.c.n2.dn1 = s.c2 WHEN NOT MATCHED THEN   INSERT (id, id) VALUES (s.c1, null)", new Object[]{this.tableName});
        });
        AssertHelpers.assertThrows("Should complain about missing columns", AnalysisException.class, "must provide values for all columns of the target table", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN NOT MATCHED THEN   INSERT (id) VALUES (s.c1)", new Object[]{this.tableName});
        });
    }

    @Test
    public void testMergeWithInvalidUpdates() {
        createAndInitTable("id INT, a ARRAY<STRUCT<c1:INT,c2:INT>>, m MAP<STRING,STRING>");
        createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
        AssertHelpers.assertThrows("Should complain about updating an array column", AnalysisException.class, "Updating nested fields is only supported for structs", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.a.c1 = s.c2", new Object[]{this.tableName});
        });
        AssertHelpers.assertThrows("Should complain about updating a map column", AnalysisException.class, "Updating nested fields is only supported for structs", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.m.key = 'new_key'", new Object[]{this.tableName});
        });
    }

    @Test
    public void testMergeWithConflictingUpdates() {
        createAndInitTable("id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>");
        createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
        AssertHelpers.assertThrows("Should complain about conflicting updates to a top-level column", AnalysisException.class, "Updates are in conflict", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.id = 1, t.c.n1 = 2, t.id = 2", new Object[]{this.tableName});
        });
        AssertHelpers.assertThrows("Should complain about conflicting updates to a nested column", AnalysisException.class, "Updates are in conflict for these columns", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.c.n1 = 1, t.id = 2, t.c.n1 = 2", new Object[]{this.tableName});
        });
        AssertHelpers.assertThrows("Should complain about conflicting updates to a nested column", AnalysisException.class, "Updates are in conflict", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET c.n1 = 1, c = named_struct('n1', 1, 'n2', named_struct('dn1', 1, 'dn2', 2))", new Object[]{this.tableName});
        });
    }

    @Test
    public void testMergeWithInvalidAssignments() {
        createAndInitTable("id INT NOT NULL, s STRUCT<n1:INT NOT NULL,n2:STRUCT<dn1:INT,dn2:INT>> NOT NULL");
        createOrReplaceView("source", "c1 INT, c2 STRUCT<n1:INT NOT NULL> NOT NULL, c3 STRING NOT NULL, c4 STRUCT<dn2:INT,dn1:INT>", "{ \"c1\": -100, \"c2\": { \"n1\" : 1 }, \"c3\" : 'str', \"c4\": { \"dn2\": 1, \"dn2\": 2 } }");
        for (String str : new String[]{"ansi", "strict"}) {
            withSQLConf(ImmutableMap.of("spark.sql.storeAssignmentPolicy", str), () -> {
                AssertHelpers.assertThrows("Should complain about writing nulls to a top-level column", AnalysisException.class, "Cannot write nullable values to non-null column", () -> {
                    sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.id = NULL", new Object[]{this.tableName});
                });
                AssertHelpers.assertThrows("Should complain about writing nulls to a nested column", AnalysisException.class, "Cannot write nullable values to non-null column", () -> {
                    sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.s.n1 = NULL", new Object[]{this.tableName});
                });
                AssertHelpers.assertThrows("Should complain about writing missing fields in structs", AnalysisException.class, "missing fields", () -> {
                    sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.s = s.c2", new Object[]{this.tableName});
                });
                AssertHelpers.assertThrows("Should complain about writing invalid data types", AnalysisException.class, "Cannot safely cast", () -> {
                    sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.s.n1 = s.c3", new Object[]{this.tableName});
                });
                AssertHelpers.assertThrows("Should complain about writing incompatible structs", AnalysisException.class, "field name does not match", () -> {
                    sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.s.n2 = s.c4", new Object[]{this.tableName});
                });
            });
        }
    }

    @Test
    public void testMergeWithNonDeterministicConditions() {
        createAndInitTable("id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>");
        createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
        AssertHelpers.assertThrows("Should complain about non-deterministic search conditions", AnalysisException.class, "nondeterministic expressions are only allowed in", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 AND rand() > t.id WHEN MATCHED THEN   UPDATE SET t.c.n1 = -1", new Object[]{this.tableName});
        });
        AssertHelpers.assertThrows("Should complain about non-deterministic update conditions", AnalysisException.class, "nondeterministic expressions are only allowed in", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED AND rand() > t.id THEN   UPDATE SET t.c.n1 = -1", new Object[]{this.tableName});
        });
        AssertHelpers.assertThrows("Should complain about non-deterministic delete conditions", AnalysisException.class, "nondeterministic expressions are only allowed in", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED AND rand() > t.id THEN   DELETE", new Object[]{this.tableName});
        });
        AssertHelpers.assertThrows("Should complain about non-deterministic insert conditions", AnalysisException.class, "nondeterministic expressions are only allowed in", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN NOT MATCHED AND rand() > c1 THEN   INSERT (id, c) VALUES (1, null)", new Object[]{this.tableName});
        });
    }

    @Test
    public void testMergeWithAggregateExpressions() {
        createAndInitTable("id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>");
        createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
        AssertHelpers.assertThrows("Should complain about agg expressions in search conditions", AnalysisException.class, "contains one or more unsupported", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 AND max(t.id) == 1 WHEN MATCHED THEN   UPDATE SET t.c.n1 = -1", new Object[]{this.tableName});
        });
        AssertHelpers.assertThrows("Should complain about agg expressions in update conditions", AnalysisException.class, "contains one or more unsupported", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED AND sum(t.id) < 1 THEN   UPDATE SET t.c.n1 = -1", new Object[]{this.tableName});
        });
        AssertHelpers.assertThrows("Should complain about non-deterministic delete conditions", AnalysisException.class, "contains one or more unsupported", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED AND sum(t.id) THEN   DELETE", new Object[]{this.tableName});
        });
        AssertHelpers.assertThrows("Should complain about non-deterministic insert conditions", AnalysisException.class, "contains one or more unsupported", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN NOT MATCHED AND sum(c1) < 1 THEN   INSERT (id, c) VALUES (1, null)", new Object[]{this.tableName});
        });
    }

    @Test
    public void testMergeWithSubqueriesInConditions() {
        createAndInitTable("id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>");
        createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
        AssertHelpers.assertThrows("Should complain about subquery expressions", AnalysisException.class, "Subqueries are not supported in conditions", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 AND t.id < (SELECT max(c2) FROM source) WHEN MATCHED THEN   UPDATE SET t.c.n1 = s.c2", new Object[]{this.tableName});
        });
        AssertHelpers.assertThrows("Should complain about subquery expressions", AnalysisException.class, "Subqueries are not supported in conditions", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED AND t.id < (SELECT max(c2) FROM source) THEN   UPDATE SET t.c.n1 = s.c2", new Object[]{this.tableName});
        });
        AssertHelpers.assertThrows("Should complain about subquery expressions", AnalysisException.class, "Subqueries are not supported in conditions", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED AND t.id NOT IN (SELECT c2 FROM source) THEN   DELETE", new Object[]{this.tableName});
        });
        AssertHelpers.assertThrows("Should complain about subquery expressions", AnalysisException.class, "Subqueries are not supported in conditions", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN NOT MATCHED AND s.c1 IN (SELECT c2 FROM source) THEN   INSERT (id, c) VALUES (1, null)", new Object[]{this.tableName});
        });
    }

    @Test
    public void testMergeWithTargetColumnsInInsertCondtions() {
        createAndInitTable("id INT, c2 INT");
        createOrReplaceView("source", "{ \"id\": 1, \"value\": 11 }");
        AssertHelpers.assertThrows("Should complain about the target column", AnalysisException.class, "cannot resolve '`c2`'", () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.id WHEN NOT MATCHED AND c2 = 1 THEN   INSERT (id, c2) VALUES (s.id, null)", new Object[]{this.tableName});
        });
    }

    @Test
    public void testMergeWithNonIcebergTargetTableNotSupported() {
        createOrReplaceView("target", "{ \"c1\": -100, \"c2\": -200 }");
        createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
        AssertHelpers.assertThrows("Should complain non iceberg target table", UnsupportedOperationException.class, "MERGE INTO TABLE is not supported temporarily.", () -> {
            sql("MERGE INTO target t USING source s ON t.c1 == s.c1 WHEN MATCHED THEN   UPDATE SET *", new Object[0]);
        });
    }

    @Test
    public void testMergeSinglePartitionPartitioning() {
        createAndInitTable("id INT", "{\"id\": -1}");
        spark.range(0L, 5L).coalesce(1).createOrReplaceTempView("source");
        sql("MERGE INTO %s t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET *WHEN NOT MATCHED THEN INSERT *", new Object[]{this.tableName});
        assertEquals("Should correctly add the non-matching rows", ImmutableList.of(row(new Object[]{-1}), row(new Object[]{0}), row(new Object[]{1}), row(new Object[]{2}), row(new Object[]{3}), row(new Object[]{4})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
    }

    @Test
    public void testMergeEmptyTable() {
        createAndInitTable("id INT", null);
        spark.range(0L, 5L).coalesce(1).createOrReplaceTempView("source");
        sql("MERGE INTO %s t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET *WHEN NOT MATCHED THEN INSERT *", new Object[]{this.tableName});
        assertEquals("Should correctly add the non-matching rows", ImmutableList.of(row(new Object[]{0}), row(new Object[]{1}), row(new Object[]{2}), row(new Object[]{3}), row(new Object[]{4})), sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
    }

    @Test
    public void testFileFilterMetric() throws Exception {
        createAndInitTable("id INT, dep STRING");
        spark.sql(String.format("INSERT INTO %s VALUES (1, 'emp-id-one')", this.tableName));
        spark.sql(String.format("INSERT INTO %s VALUES (6, 'emp-id-six')", this.tableName));
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("candidate files", "2");
        newHashMap.put("matching files", "1");
        checkMetrics(() -> {
            return spark.sql(String.format("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED THEN UPDATE SET * ", this.tableName));
        }, newHashMap);
    }
}
