package org.apache.iceberg.spark.extensions;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.SparkException;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.internal.SQLConf;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/iceberg/spark/extensions/TestMerge.class */
public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
    public TestMerge(String str, String str2, Map<String, String> map, String str3, boolean z, String str4, boolean z2, String str5, PlanningMode planningMode) {
        super(str, str2, map, str3, z, str4, z2, str5, planningMode);
    }

    @BeforeClass
    public static void setupSparkConf() {
        spark.conf().set("spark.sql.shuffle.partitions", "4");
    }

    @After
    public void removeTables() {
        sql("DROP TABLE IF EXISTS %s", new Object[]{this.tableName});
        sql("DROP TABLE IF EXISTS source", new Object[0]);
    }

    @Test
    public void testMergeWithVectorizedReads() {
        Assumptions.assumeThat(supportsVectorization()).isTrue();
        createAndInitTable("id INT, value INT, dep STRING", "PARTITIONED BY (dep)", "{ \"id\": 1, \"value\": 100, \"dep\": \"hr\" }\n{ \"id\": 6, \"value\": 600, \"dep\": \"software\" }");
        createOrReplaceView("source", "id INT, value INT", "{ \"id\": 2, \"value\": 201 }\n{ \"id\": 1, \"value\": 101 }\n{ \"id\": 6, \"value\": 601 }");
        assertAllBatchScansVectorized(executeAndKeepPlan("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET t.value = s.value WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT (id, value, dep) VALUES (s.id, s.value, 'invalid')", new Object[]{commitTarget()}));
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, 101, "hr"}), row(new Object[]{2, 201, "invalid"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testCoalesceMerge() {
        createAndInitTable("id INT, salary INT, dep STRING");
        String[] strArr = new String[100];
        for (int i = 0; i < 100; i++) {
            strArr[i] = String.format("{ \"id\": %d, \"salary\": 100, \"dep\": \"hr\" }", Integer.valueOf(i));
        }
        append(this.tableName, strArr);
        append(this.tableName, strArr);
        append(this.tableName, strArr);
        append(this.tableName, strArr);
        sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", new Object[]{this.tableName, tablePropsAsString(ImmutableMap.of("read.split.open-file-cost", String.valueOf(Integer.MAX_VALUE), "write.merge.distribution-mode", DistributionMode.NONE.modeName()))});
        createBranchIfNeeded();
        spark.range(0L, 100L).createOrReplaceTempView("source");
        withSQLConf(ImmutableMap.of(SQLConf.SHUFFLE_PARTITIONS().key(), "200", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), "-1", SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true", SQLConf.COALESCE_PARTITIONS_ENABLED().key(), "true", SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "256MB"), () -> {
            sql("MERGE INTO %s t USING source ON t.id = source.id WHEN MATCHED THEN   UPDATE SET salary = -1 ", new Object[]{commitTarget()});
        });
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot latestSnapshot = SnapshotUtil.latestSnapshot(loadTable, this.branch);
        if (mode(loadTable) == RowLevelOperationMode.COPY_ON_WRITE) {
            validateProperty(latestSnapshot, "added-data-files", "1");
        } else {
            validateProperty(latestSnapshot, "added-delete-files", "1");
        }
        Assert.assertEquals("Row count must match", 400L, scalarSql("SELECT COUNT(*) FROM %s WHERE salary = -1", new Object[]{commitTarget()}));
    }

    @Test
    public void testSkewMerge() {
        createAndInitTable("id INT, salary INT, dep STRING");
        sql("ALTER TABLE %s ADD PARTITION FIELD dep", new Object[]{this.tableName});
        String[] strArr = new String[100];
        for (int i = 0; i < 100; i++) {
            strArr[i] = String.format("{ \"id\": %d, \"salary\": 100, \"dep\": \"hr\" }", Integer.valueOf(i));
        }
        append(this.tableName, strArr);
        append(this.tableName, strArr);
        append(this.tableName, strArr);
        append(this.tableName, strArr);
        sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", new Object[]{this.tableName, tablePropsAsString(ImmutableMap.of("read.split.open-file-cost", String.valueOf(Integer.MAX_VALUE), "write.merge.distribution-mode", DistributionMode.HASH.modeName()))});
        createBranchIfNeeded();
        spark.range(0L, 100L).createOrReplaceTempView("source");
        withSQLConf(ImmutableMap.of(SQLConf.SHUFFLE_PARTITIONS().key(), "4", SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE().key(), "100", SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true", SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true", SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"), () -> {
            Assertions.assertThat(executeAndKeepPlan("MERGE INTO %s t USING source ON t.id = source.id WHEN MATCHED THEN   UPDATE SET salary = -1 ", new Object[]{commitTarget()}).toString()).contains(new CharSequence[]{"REBALANCE_PARTITIONS_BY_COL"});
        });
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot latestSnapshot = SnapshotUtil.latestSnapshot(loadTable, this.branch);
        if (mode(loadTable) == RowLevelOperationMode.COPY_ON_WRITE) {
            validateProperty(latestSnapshot, "added-data-files", "4");
        } else {
            validateProperty(latestSnapshot, "added-delete-files", "4");
        }
        Assert.assertEquals("Row count must match", 400L, scalarSql("SELECT COUNT(*) FROM %s WHERE salary = -1", new Object[]{commitTarget()}));
    }

    @Test
    public void testMergeConditionSplitIntoTargetPredicateAndJoinCondition() {
        createAndInitTable("id INT, salary INT, dep STRING, sub_dep STRING", "PARTITIONED BY (dep, sub_dep)", "{ \"id\": 1, \"salary\": 100, \"dep\": \"d1\", \"sub_dep\": \"sd1\" }\n{ \"id\": 6, \"salary\": 600, \"dep\": \"d6\", \"sub_dep\": \"sd6\" }");
        createOrReplaceView("source", "id INT, salary INT, dep STRING, sub_dep STRING", "{ \"id\": 1, \"salary\": 101, \"dep\": \"d1\", \"sub_dep\": \"sd1\" }\n{ \"id\": 2, \"salary\": 200, \"dep\": \"d2\", \"sub_dep\": \"sd2\" }\n{ \"id\": 3, \"salary\": 300, \"dep\": \"d3\", \"sub_dep\": \"sd3\"  }");
        String format = String.format("MERGE INTO %s AS t USING source AS s ON t.id == s.id AND ((t.dep = 'd1' AND t.sub_dep IN ('sd1', 'sd3')) OR (t.dep = 'd6' AND t.sub_dep IN ('sd2', 'sd6'))) WHEN MATCHED THEN   UPDATE SET salary = s.salary WHEN NOT MATCHED THEN   INSERT *", commitTarget());
        if (mode(this.validationCatalog.loadTable(this.tableIdent)) == RowLevelOperationMode.COPY_ON_WRITE) {
            checkJoinAndFilterConditions(format, "Join [id], [id], FullOuter", "((dep = 'd1' AND sub_dep IN ('sd1', 'sd3')) OR (dep = 'd6' AND sub_dep IN ('sd2', 'sd6')))");
        } else {
            checkJoinAndFilterConditions(format, "Join [id], [id], RightOuter", "((dep = 'd1' AND sub_dep IN ('sd1', 'sd3')) OR (dep = 'd6' AND sub_dep IN ('sd2', 'sd6')))");
        }
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, 101, "d1", "sd1"}), row(new Object[]{2, 200, "d2", "sd2"}), row(new Object[]{3, 300, "d3", "sd3"}), row(new Object[]{6, 600, "d6", "sd6"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithStaticPredicatePushDown() {
        createAndInitTable("id BIGINT, dep STRING");
        sql("ALTER TABLE %s ADD PARTITION FIELD dep", new Object[]{this.tableName});
        append(this.tableName, "{ \"id\": 1, \"dep\": \"software\" }");
        createBranchIfNeeded();
        append(commitTarget(), "{ \"id\": 1, \"dep\": \"hr\" }");
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot latestSnapshot = SnapshotUtil.latestSnapshot(loadTable, this.branch);
        Assert.assertEquals("Must have 2 files before MERGE", "2", (String) latestSnapshot.summary().get("total-data-files"));
        createOrReplaceView("source", "{ \"id\": 1, \"dep\": \"finance\" }\n{ \"id\": 2, \"dep\": \"hardware\" }");
        withUnavailableFiles(latestSnapshot.addedDataFiles(loadTable.io()), () -> {
            withSQLConf(ImmutableMap.of(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED().key(), "false"), () -> {
                sql("MERGE INTO %s t USING source ON t.id == source.id AND t.dep IN ('software') AND source.id < 10 WHEN MATCHED AND source.id = 1 THEN   UPDATE SET dep = source.dep WHEN NOT MATCHED THEN   INSERT (dep, id) VALUES (source.dep, source.id)", new Object[]{commitTarget()});
            });
        });
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1L, "finance"}), row(new Object[]{1L, "hr"}), row(new Object[]{2L, "hardware"})), sql("SELECT * FROM %s ORDER BY id, dep", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeIntoEmptyTargetInsertAllNonMatchingRows() {
        Assume.assumeFalse("Custom branch does not exist for empty table", "test".equals(this.branch));
        createAndInitTable("id INT, dep STRING");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 3, \"dep\": \"emp-id-3\" }");
        sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN NOT MATCHED THEN   INSERT *", new Object[]{this.tableName});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "emp-id-1"}), row(new Object[]{2, "emp-id-2"}), row(new Object[]{3, "emp-id-3"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeIntoEmptyTargetInsertOnlyMatchingRows() {
        Assume.assumeFalse("Custom branch does not exist for empty table", "test".equals(this.branch));
        createAndInitTable("id INT, dep STRING");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 3, \"dep\": \"emp-id-3\" }");
        sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN NOT MATCHED AND (s.id >=2) THEN   INSERT *", new Object[]{this.tableName});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{2, "emp-id-2"}), row(new Object[]{3, "emp-id-3"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithOnlyUpdateClause() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-six\" }");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET *", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "emp-id-1"}), row(new Object[]{6, "emp-id-six"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithOnlyUpdateClauseAndNullValues() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": null, \"dep\": \"emp-id-one\" }\n{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-six\" }");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id AND t.id < 3 WHEN MATCHED THEN   UPDATE SET *", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{null, "emp-id-one"}), row(new Object[]{1, "emp-id-1"}), row(new Object[]{6, "emp-id-six"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithOnlyUpdateNullUnmatchedValues() {
        createAndInitTable("id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n{ \"id\": 6, \"value\": null }");
        createOrReplaceView("source", "id INT NOT NULL, value INT", "{ \"id\": 1, \"value\": 100 }\n");
        sql("MERGE INTO %s t USING source s ON t.id == s.id WHEN MATCHED THEN   UPDATE SET id=123, value=456", new Object[]{commitTarget()});
        sql("SELECT * FROM %s", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{6, null}), row(new Object[]{123, 456})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithOnlyUpdateSingleFieldNullUnmatchedValues() {
        createAndInitTable("id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n{ \"id\": 6, \"value\": null }");
        createOrReplaceView("source", "id INT NOT NULL, value INT", "{ \"id\": 1, \"value\": 100 }\n");
        sql("MERGE INTO %s t USING source s ON t.id == s.id WHEN MATCHED THEN   UPDATE SET id=123", new Object[]{commitTarget()});
        sql("SELECT * FROM %s", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{6, null}), row(new Object[]{123, 2})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithOnlyDeleteNullUnmatchedValues() {
        createAndInitTable("id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n{ \"id\": 6, \"value\": null }");
        createOrReplaceView("source", "id INT NOT NULL, value INT", "{ \"id\": 1, \"value\": 100 }\n");
        sql("MERGE INTO %s t USING source s ON t.id == s.id WHEN MATCHED THEN DELETE", new Object[]{commitTarget()});
        sql("SELECT * FROM %s", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{6, null})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithOnlyDeleteClause() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 6 THEN   DELETE", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "emp-id-one"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithAllCauses() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET * WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT *", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "emp-id-1"}), row(new Object[]{2, "emp-id-2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithAllCausesWithExplicitColumnSpecification() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET t.id = s.id, t.dep = s.dep WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT (t.id, t.dep) VALUES (s.id, s.dep)", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "emp-id-1"}), row(new Object[]{2, "emp-id-2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithSourceCTE() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-two\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-3\" }\n{ \"id\": 1, \"dep\": \"emp-id-2\" }\n{ \"id\": 5, \"dep\": \"emp-id-6\" }");
        sql("WITH cte1 AS (SELECT id + 1 AS id, dep FROM source) MERGE INTO %s AS t USING cte1 AS s ON t.id == s.id WHEN MATCHED AND t.id = 2 THEN   UPDATE SET * WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.id = 3 THEN   INSERT *", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{2, "emp-id-2"}), row(new Object[]{3, "emp-id-3"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithSourceFromSetOps() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        sql("MERGE INTO %s AS t USING (%s) AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET * WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT *", new Object[]{commitTarget(), "SELECT * FROM source WHERE id = 2 UNION ALL SELECT * FROM source WHERE id = 1 OR id = 6"});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "emp-id-1"}), row(new Object[]{2, "emp-id-2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithOneMatchingBranchButMultipleSourceRowsForTargetRow() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 1, \"state\": \"on\" }\n{ \"id\": 1, \"state\": \"off\" }\n{ \"id\": 10, \"state\": \"on\" }");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED THEN   INSERT (id, dep) VALUES (s.id, 'unknown')", new Object[]{commitTarget()});
        }).cause().isInstanceOf(SparkException.class).hasMessageContaining("MERGE statement matched a single row from the target table with multiple rows of the source table.");
        assertEquals("Target should be unchanged", ImmutableList.of(row(new Object[]{1, "emp-id-one"}), row(new Object[]{6, "emp-id-6"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSource() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 10000; i++) {
            newArrayList.add(Integer.valueOf(i));
        }
        Dataset createDataset = spark.createDataset(newArrayList, Encoders.INT());
        createDataset.union(createDataset).createOrReplaceTempView("source");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s AS t USING source AS s ON t.id == s.value WHEN MATCHED AND t.id = 1 THEN   UPDATE SET id = 10 WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.value = 2 THEN   INSERT (id, dep) VALUES (s.value, null)", new Object[]{commitTarget()});
        }).cause().isInstanceOf(SparkException.class).hasMessageContaining("MERGE statement matched a single row from the target table with multiple rows of the source table.");
        assertEquals("Target should be unchanged", ImmutableList.of(row(new Object[]{1, "emp-id-one"}), row(new Object[]{6, "emp-id-6"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSourceEnabledHashShuffleJoin() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 10000; i++) {
            newArrayList.add(Integer.valueOf(i));
        }
        Dataset createDataset = spark.createDataset(newArrayList, Encoders.INT());
        createDataset.union(createDataset).createOrReplaceTempView("source");
        withSQLConf(ImmutableMap.of(SQLConf.PREFER_SORTMERGEJOIN().key(), "false"), () -> {
            Assertions.assertThatThrownBy(() -> {
                sql("MERGE INTO %s AS t USING source AS s ON t.id == s.value WHEN MATCHED AND t.id = 1 THEN   UPDATE SET id = 10 WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.value = 2 THEN   INSERT (id, dep) VALUES (s.value, null)", new Object[]{commitTarget()});
            }).cause().isInstanceOf(SparkException.class).hasMessageContaining("MERGE statement matched a single row from the target table with multiple rows of the source table.");
        });
        assertEquals("Target should be unchanged", ImmutableList.of(row(new Object[]{1, "emp-id-one"}), row(new Object[]{6, "emp-id-6"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSourceNoEqualityCondition() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }");
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 10000; i++) {
            newArrayList.add(Integer.valueOf(i));
        }
        Dataset createDataset = spark.createDataset(newArrayList, Encoders.INT());
        createDataset.union(createDataset).createOrReplaceTempView("source");
        withSQLConf(ImmutableMap.of(SQLConf.PREFER_SORTMERGEJOIN().key(), "false"), () -> {
            Assertions.assertThatThrownBy(() -> {
                sql("MERGE INTO %s AS t USING source AS s ON t.id > s.value WHEN MATCHED AND t.id = 1 THEN   UPDATE SET id = 10 WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.value = 2 THEN   INSERT (id, dep) VALUES (s.value, null)", new Object[]{commitTarget()});
            }).cause().isInstanceOf(SparkException.class).hasMessageContaining("MERGE statement matched a single row from the target table with multiple rows of the source table.");
        });
        assertEquals("Target should be unchanged", ImmutableList.of(row(new Object[]{1, "emp-id-one"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSourceNoNotMatchedActions() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 10000; i++) {
            newArrayList.add(Integer.valueOf(i));
        }
        Dataset createDataset = spark.createDataset(newArrayList, Encoders.INT());
        createDataset.union(createDataset).createOrReplaceTempView("source");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s AS t USING source AS s ON t.id == s.value WHEN MATCHED AND t.id = 1 THEN   UPDATE SET id = 10 WHEN MATCHED AND t.id = 6 THEN   DELETE", new Object[]{commitTarget()});
        }).cause().isInstanceOf(SparkException.class).hasMessageContaining("MERGE statement matched a single row from the target table with multiple rows of the source table.");
        assertEquals("Target should be unchanged", ImmutableList.of(row(new Object[]{1, "emp-id-one"}), row(new Object[]{6, "emp-id-6"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSourceNoNotMatchedActionsNoEqualityCondition() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }");
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 10000; i++) {
            newArrayList.add(Integer.valueOf(i));
        }
        Dataset createDataset = spark.createDataset(newArrayList, Encoders.INT());
        createDataset.union(createDataset).createOrReplaceTempView("source");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s AS t USING source AS s ON t.id > s.value WHEN MATCHED AND t.id = 1 THEN   UPDATE SET id = 10 WHEN MATCHED AND t.id = 6 THEN   DELETE", new Object[]{commitTarget()});
        }).cause().isInstanceOf(SparkException.class).hasMessageContaining("MERGE statement matched a single row from the target table with multiple rows of the source table.");
        assertEquals("Target should be unchanged", ImmutableList.of(row(new Object[]{1, "emp-id-one"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithMultipleUpdatesForTargetRow() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET * WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT *", new Object[]{commitTarget()});
        }).cause().isInstanceOf(SparkException.class).hasMessageContaining("MERGE statement matched a single row from the target table with multiple rows of the source table.");
        assertEquals("Target should be unchanged", ImmutableList.of(row(new Object[]{1, "emp-id-one"}), row(new Object[]{6, "emp-id-6"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithUnconditionalDelete() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT *", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{2, "emp-id-2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithSingleConditionalDelete() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT *", new Object[]{commitTarget()});
        }).cause().isInstanceOf(SparkException.class).hasMessageContaining("MERGE statement matched a single row from the target table with multiple rows of the source table.");
        assertEquals("Target should be unchanged", ImmutableList.of(row(new Object[]{1, "emp-id-one"}), row(new Object[]{6, "emp-id-6"})), sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithIdentityTransform() {
        for (DistributionMode distributionMode : DistributionMode.values()) {
            createAndInitTable("id INT, dep STRING");
            sql("ALTER TABLE %s ADD PARTITION FIELD identity(dep)", new Object[]{this.tableName});
            sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", new Object[]{this.tableName, "write.distribution-mode", distributionMode.modeName()});
            append(this.tableName, "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
            createBranchIfNeeded();
            createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
            sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET * WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT *", new Object[]{commitTarget()});
            assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "emp-id-1"}), row(new Object[]{2, "emp-id-2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
            removeTables();
        }
    }

    @Test
    public void testMergeWithDaysTransform() {
        for (DistributionMode distributionMode : DistributionMode.values()) {
            createAndInitTable("id INT, ts TIMESTAMP");
            sql("ALTER TABLE %s ADD PARTITION FIELD days(ts)", new Object[]{this.tableName});
            sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", new Object[]{this.tableName, "write.distribution-mode", distributionMode.modeName()});
            append(this.tableName, "id INT, ts TIMESTAMP", "{ \"id\": 1, \"ts\": \"2000-01-01 00:00:00\" }\n{ \"id\": 6, \"ts\": \"2000-01-06 00:00:00\" }");
            createBranchIfNeeded();
            createOrReplaceView("source", "id INT, ts TIMESTAMP", "{ \"id\": 2, \"ts\": \"2001-01-02 00:00:00\" }\n{ \"id\": 1, \"ts\": \"2001-01-01 00:00:00\" }\n{ \"id\": 6, \"ts\": \"2001-01-06 00:00:00\" }");
            sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET * WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT *", new Object[]{commitTarget()});
            assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "2001-01-01 00:00:00"}), row(new Object[]{2, "2001-01-02 00:00:00"})), sql("SELECT id, CAST(ts AS STRING) FROM %s ORDER BY id", new Object[]{selectTarget()}));
            removeTables();
        }
    }

    @Test
    public void testMergeWithBucketTransform() {
        for (DistributionMode distributionMode : DistributionMode.values()) {
            createAndInitTable("id INT, dep STRING");
            sql("ALTER TABLE %s ADD PARTITION FIELD bucket(2, dep)", new Object[]{this.tableName});
            sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", new Object[]{this.tableName, "write.distribution-mode", distributionMode.modeName()});
            append(this.tableName, "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
            createBranchIfNeeded();
            createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
            sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET * WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT *", new Object[]{commitTarget()});
            assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "emp-id-1"}), row(new Object[]{2, "emp-id-2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
            removeTables();
        }
    }

    @Test
    public void testMergeWithTruncateTransform() {
        for (DistributionMode distributionMode : DistributionMode.values()) {
            createAndInitTable("id INT, dep STRING");
            sql("ALTER TABLE %s ADD PARTITION FIELD truncate(dep, 2)", new Object[]{this.tableName});
            sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", new Object[]{this.tableName, "write.distribution-mode", distributionMode.modeName()});
            append(this.tableName, "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
            createBranchIfNeeded();
            createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
            sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET * WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT *", new Object[]{commitTarget()});
            assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "emp-id-1"}), row(new Object[]{2, "emp-id-2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
            removeTables();
        }
    }

    @Test
    public void testMergeIntoPartitionedAndOrderedTable() {
        for (DistributionMode distributionMode : DistributionMode.values()) {
            createAndInitTable("id INT, dep STRING");
            sql("ALTER TABLE %s ADD PARTITION FIELD dep", new Object[]{this.tableName});
            sql("ALTER TABLE %s WRITE ORDERED BY (id)", new Object[]{this.tableName});
            sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", new Object[]{this.tableName, "write.distribution-mode", distributionMode.modeName()});
            append(this.tableName, "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
            createBranchIfNeeded();
            createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
            sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET * WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT *", new Object[]{commitTarget()});
            assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "emp-id-1"}), row(new Object[]{2, "emp-id-2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
            removeTables();
        }
    }

    @Test
    public void testSelfMerge() {
        createAndInitTable("id INT, v STRING", "{ \"id\": 1, \"v\": \"v1\" }\n{ \"id\": 2, \"v\": \"v2\" }");
        sql("MERGE INTO %s t USING %s s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET v = 'x' WHEN NOT MATCHED THEN   INSERT *", new Object[]{commitTarget(), commitTarget()});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, "x"}), row(new Object[]{2, "v2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testSelfMergeWithCaching() {
        createAndInitTable("id INT, v STRING", "{ \"id\": 1, \"v\": \"v1\" }\n{ \"id\": 2, \"v\": \"v2\" }");
        sql("CACHE TABLE %s", new Object[]{this.tableName});
        sql("MERGE INTO %s t USING %s s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET v = 'x' WHEN NOT MATCHED THEN   INSERT *", new Object[]{commitTarget(), commitTarget()});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, "x"}), row(new Object[]{2, "v2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{commitTarget()}));
    }

    @Test
    public void testMergeWithSourceAsSelfSubquery() {
        createAndInitTable("id INT, v STRING", "{ \"id\": 1, \"v\": \"v1\" }\n{ \"id\": 2, \"v\": \"v2\" }");
        createOrReplaceView("source", Arrays.asList(1, null), Encoders.INT());
        sql("MERGE INTO %s t USING (SELECT id AS value FROM %s r JOIN source ON r.id = source.value) s ON t.id == s.value WHEN MATCHED AND t.id = 1 THEN   UPDATE SET v = 'x' WHEN NOT MATCHED THEN   INSERT (v, id) VALUES ('invalid', -1) ", new Object[]{commitTarget(), commitTarget()});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, "x"}), row(new Object[]{2, "v2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public synchronized void testMergeWithSerializableIsolation() throws InterruptedException {
        Assume.assumeFalse(this.catalogName.equalsIgnoreCase("testhadoop"));
        Assume.assumeTrue(cachingCatalogEnabled());
        createAndInitTable("id INT, dep STRING");
        createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
        sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", new Object[]{this.tableName, "write.merge.isolation-level", "serializable"});
        sql("INSERT INTO TABLE %s VALUES (1, 'hr')", new Object[]{this.tableName});
        createBranchIfNeeded();
        ExecutorService exitingExecutorService = MoreExecutors.getExitingExecutorService((ThreadPoolExecutor) Executors.newFixedThreadPool(2));
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        Future<?> submit = exitingExecutorService.submit(() -> {
            for (int i = 0; i < Integer.MAX_VALUE; i++) {
                while (atomicInteger.get() < i * 2) {
                    sleep(10L);
                }
                sql("MERGE INTO %s t USING source s ON t.id == s.value WHEN MATCHED THEN   UPDATE SET dep = 'x'", new Object[]{commitTarget()});
                atomicInteger.incrementAndGet();
            }
        });
        Future<?> submit2 = exitingExecutorService.submit(() -> {
            Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
            GenericRecord create = GenericRecord.create(loadTable.schema());
            create.set(0, 1);
            create.set(1, "hr");
            for (int i = 0; i < Integer.MAX_VALUE; i++) {
                while (atomicBoolean.get() && atomicInteger.get() < i * 2) {
                    sleep(10L);
                }
                if (!atomicBoolean.get()) {
                    return;
                }
                for (int i2 = 0; i2 < 5; i2++) {
                    AppendFiles appendFile = loadTable.newFastAppend().appendFile(writeDataFile(loadTable, ImmutableList.of(create)));
                    if (this.branch != null) {
                        appendFile.toBranch(this.branch);
                    }
                    appendFile.commit();
                    sleep(10L);
                }
                atomicInteger.incrementAndGet();
            }
        });
        try {
            Objects.requireNonNull(submit);
            Assertions.assertThatThrownBy(submit::get).isInstanceOf(ExecutionException.class).cause().isInstanceOf(ValidationException.class).hasMessageContaining("Found conflicting files that can contain");
            atomicBoolean.set(false);
            submit2.cancel(true);
            exitingExecutorService.shutdown();
            Assert.assertTrue("Timeout", exitingExecutorService.awaitTermination(2L, TimeUnit.MINUTES));
        } catch (Throwable th) {
            atomicBoolean.set(false);
            submit2.cancel(true);
            throw th;
        }
    }

    @Test
    public synchronized void testMergeWithSnapshotIsolation() throws InterruptedException, ExecutionException {
        Assume.assumeFalse(this.catalogName.equalsIgnoreCase("testhadoop"));
        Assume.assumeTrue(cachingCatalogEnabled());
        createAndInitTable("id INT, dep STRING");
        createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
        sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", new Object[]{this.tableName, "write.merge.isolation-level", "snapshot"});
        sql("INSERT INTO TABLE %s VALUES (1, 'hr')", new Object[]{this.tableName});
        createBranchIfNeeded();
        ExecutorService exitingExecutorService = MoreExecutors.getExitingExecutorService((ThreadPoolExecutor) Executors.newFixedThreadPool(2));
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        Future<?> submit = exitingExecutorService.submit(() -> {
            for (int i = 0; i < 20; i++) {
                while (atomicInteger.get() < i * 2) {
                    sleep(10L);
                }
                sql("MERGE INTO %s t USING source s ON t.id == s.value WHEN MATCHED THEN   UPDATE SET dep = 'x'", new Object[]{commitTarget()});
                atomicInteger.incrementAndGet();
            }
        });
        Future<?> submit2 = exitingExecutorService.submit(() -> {
            Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
            GenericRecord create = GenericRecord.create(loadTable.schema());
            create.set(0, 1);
            create.set(1, "hr");
            for (int i = 0; i < 20; i++) {
                while (atomicBoolean.get() && atomicInteger.get() < i * 2) {
                    sleep(10L);
                }
                if (!atomicBoolean.get()) {
                    return;
                }
                for (int i2 = 0; i2 < 5; i2++) {
                    AppendFiles appendFile = loadTable.newFastAppend().appendFile(writeDataFile(loadTable, ImmutableList.of(create)));
                    if (this.branch != null) {
                        appendFile.toBranch(this.branch);
                    }
                    appendFile.commit();
                    sleep(10L);
                }
                atomicInteger.incrementAndGet();
            }
        });
        try {
            submit.get();
            atomicBoolean.set(false);
            submit2.cancel(true);
            exitingExecutorService.shutdown();
            Assert.assertTrue("Timeout", exitingExecutorService.awaitTermination(2L, TimeUnit.MINUTES));
        } catch (Throwable th) {
            atomicBoolean.set(false);
            submit2.cancel(true);
            throw th;
        }
    }

    @Test
    public void testMergeWithExtraColumnsInSource() {
        createAndInitTable("id INT, v STRING", "{ \"id\": 1, \"v\": \"v1\" }\n{ \"id\": 2, \"v\": \"v2\" }");
        createOrReplaceView("source", "{ \"id\": 1, \"extra_col\": -1, \"v\": \"v1_1\" }\n{ \"id\": 3, \"extra_col\": -1, \"v\": \"v3\" }\n{ \"id\": 4, \"extra_col\": -1, \"v\": \"v4\" }");
        sql("MERGE INTO %s t USING source ON t.id == source.id WHEN MATCHED THEN   UPDATE SET v = source.v WHEN NOT MATCHED THEN   INSERT (v, id) VALUES (source.v, source.id)", new Object[]{commitTarget()});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, "v1_1"}), row(new Object[]{2, "v2"}), row(new Object[]{3, "v3"}), row(new Object[]{4, "v4"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithNullsInTargetAndSource() {
        createAndInitTable("id INT, v STRING", "{ \"id\": null, \"v\": \"v1\" }\n{ \"id\": 2, \"v\": \"v2\" }");
        createOrReplaceView("source", "{ \"id\": null, \"v\": \"v1_1\" }\n{ \"id\": 4, \"v\": \"v4\" }");
        sql("MERGE INTO %s t USING source ON t.id == source.id WHEN MATCHED THEN   UPDATE SET v = source.v WHEN NOT MATCHED THEN   INSERT (v, id) VALUES (source.v, source.id)", new Object[]{commitTarget()});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{null, "v1"}), row(new Object[]{null, "v1_1"}), row(new Object[]{2, "v2"}), row(new Object[]{4, "v4"})), sql("SELECT * FROM %s ORDER BY v", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithNullSafeEquals() {
        createAndInitTable("id INT, v STRING", "{ \"id\": null, \"v\": \"v1\" }\n{ \"id\": 2, \"v\": \"v2\" }");
        createOrReplaceView("source", "{ \"id\": null, \"v\": \"v1_1\" }\n{ \"id\": 4, \"v\": \"v4\" }");
        sql("MERGE INTO %s t USING source ON t.id <=> source.id WHEN MATCHED THEN   UPDATE SET v = source.v WHEN NOT MATCHED THEN   INSERT (v, id) VALUES (source.v, source.id)", new Object[]{commitTarget()});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{null, "v1_1"}), row(new Object[]{2, "v2"}), row(new Object[]{4, "v4"})), sql("SELECT * FROM %s ORDER BY v", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithNullCondition() {
        createAndInitTable("id INT, v STRING", "{ \"id\": null, \"v\": \"v1\" }\n{ \"id\": 2, \"v\": \"v2\" }");
        createOrReplaceView("source", "{ \"id\": null, \"v\": \"v1_1\" }\n{ \"id\": 2, \"v\": \"v2_2\" }");
        sql("MERGE INTO %s t USING source ON t.id == source.id AND NULL WHEN MATCHED THEN   UPDATE SET v = source.v WHEN NOT MATCHED THEN   INSERT (v, id) VALUES (source.v, source.id)", new Object[]{commitTarget()});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{null, "v1"}), row(new Object[]{null, "v1_1"}), row(new Object[]{2, "v2"}), row(new Object[]{2, "v2_2"})), sql("SELECT * FROM %s ORDER BY v", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithNullActionConditions() {
        createAndInitTable("id INT, v STRING", "{ \"id\": 1, \"v\": \"v1\" }\n{ \"id\": 2, \"v\": \"v2\" }");
        createOrReplaceView("source", "{ \"id\": 1, \"v\": \"v1_1\" }\n{ \"id\": 2, \"v\": \"v2_2\" }\n{ \"id\": 3, \"v\": \"v3_3\" }");
        sql("MERGE INTO %s t USING source ON t.id == source.id WHEN MATCHED AND source.id = 1 AND NULL THEN   UPDATE SET v = source.v WHEN MATCHED AND source.v = 'v1_1' AND NULL THEN   DELETE WHEN NOT MATCHED AND source.id = 3 AND NULL THEN   INSERT (v, id) VALUES (source.v, source.id)", new Object[]{commitTarget()});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, "v1"}), row(new Object[]{2, "v2"})), sql("SELECT * FROM %s ORDER BY v", new Object[]{selectTarget()}));
        sql("MERGE INTO %s t USING source ON t.id == source.id WHEN MATCHED AND source.id = 1 AND NULL THEN   UPDATE SET v = source.v WHEN MATCHED AND source.v = 'v1_1' THEN   DELETE WHEN NOT MATCHED AND source.id = 3 AND NULL THEN   INSERT (v, id) VALUES (source.v, source.id)", new Object[]{commitTarget()});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{2, "v2"})), sql("SELECT * FROM %s ORDER BY v", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithMultipleMatchingActions() {
        createAndInitTable("id INT, v STRING", "{ \"id\": 1, \"v\": \"v1\" }\n{ \"id\": 2, \"v\": \"v2\" }");
        createOrReplaceView("source", "{ \"id\": 1, \"v\": \"v1_1\" }\n{ \"id\": 2, \"v\": \"v2_2\" }");
        sql("MERGE INTO %s t USING source ON t.id == source.id WHEN MATCHED AND source.id = 1 THEN   UPDATE SET v = source.v WHEN MATCHED AND source.v = 'v1_1' THEN   DELETE WHEN NOT MATCHED THEN   INSERT (v, id) VALUES (source.v, source.id)", new Object[]{commitTarget()});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, "v1_1"}), row(new Object[]{2, "v2"})), sql("SELECT * FROM %s ORDER BY v", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithMultipleRowGroupsParquet() throws NoSuchTableException {
        Assume.assumeTrue(this.fileFormat.equalsIgnoreCase("parquet"));
        createAndInitTable("id INT, dep STRING");
        sql("ALTER TABLE %s ADD PARTITION FIELD dep", new Object[]{this.tableName});
        sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%d')", new Object[]{this.tableName, "write.parquet.row-group-size-bytes", 100});
        sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%d')", new Object[]{this.tableName, "read.split.target-size", 100});
        createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(200);
        for (int i = 1; i <= 200; i++) {
            newArrayListWithCapacity.add(Integer.valueOf(i));
        }
        spark.createDataset(newArrayListWithCapacity, Encoders.INT()).withColumnRenamed("value", "id").withColumn("dep", functions.lit("hr")).coalesce(1).writeTo(this.tableName).append();
        createBranchIfNeeded();
        Assert.assertEquals(200L, spark.table(commitTarget()).count());
        sql("MERGE INTO %s t USING source ON t.id == source.value WHEN MATCHED THEN   UPDATE SET dep = 'x'", new Object[]{commitTarget()});
        Assert.assertEquals(200L, spark.table(commitTarget()).count());
    }

    @Test
    public void testMergeInsertOnly() {
        createAndInitTable("id STRING, v STRING", "{ \"id\": \"a\", \"v\": \"v1\" }\n{ \"id\": \"b\", \"v\": \"v2\" }");
        createOrReplaceView("source", "{ \"id\": \"a\", \"v\": \"v1_1\" }\n{ \"id\": \"a\", \"v\": \"v1_2\" }\n{ \"id\": \"c\", \"v\": \"v3\" }\n{ \"id\": \"d\", \"v\": \"v4_1\" }\n{ \"id\": \"d\", \"v\": \"v4_2\" }");
        sql("MERGE INTO %s t USING source ON t.id == source.id WHEN NOT MATCHED THEN   INSERT *", new Object[]{commitTarget()});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{"a", "v1"}), row(new Object[]{"b", "v2"}), row(new Object[]{"c", "v3"}), row(new Object[]{"d", "v4_1"}), row(new Object[]{"d", "v4_2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeInsertOnlyWithCondition() {
        createAndInitTable("id INTEGER, v INTEGER", "{ \"id\": 1, \"v\": 1 }");
        createOrReplaceView("source", "{ \"id\": 1, \"v\": 11, \"is_new\": true }\n{ \"id\": 2, \"v\": 21, \"is_new\": true }\n{ \"id\": 2, \"v\": 22, \"is_new\": false }");
        sql("MERGE INTO %s t USING source s ON t.id == s.id WHEN NOT MATCHED AND is_new = TRUE THEN   INSERT (v, id) VALUES (s.v + 100, s.id)", new Object[]{commitTarget()});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, 1}), row(new Object[]{2, 121})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeAlignsUpdateAndInsertActions() {
        createAndInitTable("id INT, a INT, b STRING", "{ \"id\": 1, \"a\": 2, \"b\": \"str\" }");
        createOrReplaceView("source", "{ \"id\": 1, \"c1\": -2, \"c2\": \"new_str_1\" }\n{ \"id\": 2, \"c1\": -20, \"c2\": \"new_str_2\" }");
        sql("MERGE INTO %s t USING source ON t.id == source.id WHEN MATCHED THEN   UPDATE SET b = c2, a = c1, t.id = source.id WHEN NOT MATCHED THEN   INSERT (b, a, id) VALUES (c2, c1, id)", new Object[]{commitTarget()});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, -2, "new_str_1"}), row(new Object[]{2, -20, "new_str_2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeMixedCaseAlignsUpdateAndInsertActions() {
        createAndInitTable("id INT, a INT, b STRING", "{ \"id\": 1, \"a\": 2, \"b\": \"str\" }");
        createOrReplaceView("source", "{ \"id\": 1, \"c1\": -2, \"c2\": \"new_str_1\" }\n{ \"id\": 2, \"c1\": -20, \"c2\": \"new_str_2\" }");
        sql("MERGE INTO %s t USING source ON t.iD == source.Id WHEN MATCHED THEN   UPDATE SET B = c2, A = c1, t.Id = source.ID WHEN NOT MATCHED THEN   INSERT (b, A, iD) VALUES (c2, c1, id)", new Object[]{commitTarget()});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, -2, "new_str_1"}), row(new Object[]{2, -20, "new_str_2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, -2, "new_str_1"})), sql("SELECT * FROM %s WHERE id = 1 ORDER BY id", new Object[]{selectTarget()}));
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{2, -20, "new_str_2"})), sql("SELECT * FROM %s WHERE b = 'new_str_2'ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeUpdatesNestedStructFields() {
        createAndInitTable("id INT, s STRUCT<c1:INT,c2:STRUCT<a:ARRAY<INT>,m:MAP<STRING, STRING>>>", "{ \"id\": 1, \"s\": { \"c1\": 2, \"c2\": { \"a\": [1,2], \"m\": { \"a\": \"b\"} } } } }");
        createOrReplaceView("source", "{ \"id\": 1, \"c1\": -2 }");
        sql("MERGE INTO %s t USING source ON t.id == source.id WHEN MATCHED THEN   UPDATE SET t.s.c1 = source.c1, t.s.c2.a = array(-1, -2), t.s.c2.m = map('k', 'v')", new Object[]{commitTarget()});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, row(new Object[]{-2, row(new Object[]{ImmutableList.of(-1, -2), ImmutableMap.of("k", "v")})})})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
        sql("MERGE INTO %s t USING source ON t.id == source.id WHEN MATCHED THEN   UPDATE SET t.s.c1 = NULL, t.s.c2 = NULL", new Object[]{commitTarget()});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, row(new Object[]{null, null})})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
        sql("MERGE INTO %s t USING source ON t.id == source.id WHEN MATCHED THEN   UPDATE SET t.s = named_struct('c1', 100, 'c2', named_struct('a', array(1), 'm', map('x', 'y')))", new Object[]{commitTarget()});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, row(new Object[]{100, row(new Object[]{ImmutableList.of(1), ImmutableMap.of("x", "y")})})})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithInferredCasts() {
        createAndInitTable("id INT, s STRING", "{ \"id\": 1, \"s\": \"value\" }");
        createOrReplaceView("source", "{ \"id\": 1, \"c1\": -2}");
        sql("MERGE INTO %s t USING source ON t.id == source.id WHEN MATCHED THEN   UPDATE SET t.s = source.c1", new Object[]{commitTarget()});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, "-2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeModifiesNullStruct() {
        createAndInitTable("id INT, s STRUCT<n1:INT,n2:INT>", "{ \"id\": 1, \"s\": null }");
        createOrReplaceView("source", "{ \"id\": 1, \"n1\": -10 }");
        sql("MERGE INTO %s t USING source s ON t.id == s.id WHEN MATCHED THEN   UPDATE SET t.s.n1 = s.n1", new Object[]{commitTarget()});
        assertEquals("Output should match", ImmutableList.of(row(new Object[]{1, row(new Object[]{-10, null})})), sql("SELECT * FROM %s", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeRefreshesRelationCache() {
        createAndInitTable("id INT, name STRING", "{ \"id\": 1, \"name\": \"n1\" }");
        createOrReplaceView("source", "{ \"id\": 1, \"name\": \"n2\" }");
        spark.sql("SELECT name FROM " + commitTarget()).createOrReplaceTempView("tmp");
        spark.sql("CACHE TABLE tmp");
        assertEquals("View should have correct data", ImmutableList.of(row(new Object[]{"n1"})), sql("SELECT * FROM tmp", new Object[0]));
        sql("MERGE INTO %s t USING source s ON t.id == s.id WHEN MATCHED THEN   UPDATE SET t.name = s.name", new Object[]{commitTarget()});
        assertEquals("View should have correct data", ImmutableList.of(row(new Object[]{"n2"})), sql("SELECT * FROM tmp", new Object[0]));
        spark.sql("UNCACHE TABLE tmp");
    }

    @Test
    public void testMergeWithMultipleNotMatchedActions() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 0, \"dep\": \"emp-id-0\" }");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 3, \"dep\": \"emp-id-3\" }");
        sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN NOT MATCHED AND s.id = 1 THEN   INSERT (dep, id) VALUES (s.dep, -1)WHEN NOT MATCHED THEN   INSERT *", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "emp-id-1"}), row(new Object[]{0, "emp-id-0"}), row(new Object[]{2, "emp-id-2"}), row(new Object[]{3, "emp-id-3"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithMultipleConditionalNotMatchedActions() {
        createAndInitTable("id INT, dep STRING", "{ \"id\": 0, \"dep\": \"emp-id-0\" }");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 3, \"dep\": \"emp-id-3\" }");
        sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN NOT MATCHED AND s.id = 1 THEN   INSERT (dep, id) VALUES (s.dep, -1)WHEN NOT MATCHED AND s.id = 2 THEN   INSERT *", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "emp-id-1"}), row(new Object[]{0, "emp-id-0"}), row(new Object[]{2, "emp-id-2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeResolvesColumnsByName() {
        createAndInitTable("id INT, badge INT, dep STRING", "{ \"id\": 1, \"badge\": 1000, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"badge\": 6000, \"dep\": \"emp-id-6\" }");
        createOrReplaceView("source", "badge INT, id INT, dep STRING", "{ \"badge\": 1001, \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"badge\": 6006, \"id\": 6, \"dep\": \"emp-id-6\" }\n{ \"badge\": 7007, \"id\": 7, \"dep\": \"emp-id-7\" }");
        sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED THEN   UPDATE SET * WHEN NOT MATCHED THEN   INSERT * ", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, 1001, "emp-id-1"}), row(new Object[]{6, 6006, "emp-id-6"}), row(new Object[]{7, 7007, "emp-id-7"})), sql("SELECT id, badge, dep FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeShouldResolveWhenThereAreNoUnresolvedExpressionsOrColumns() {
        createAndInitTable("id INT, dep STRING");
        createOrReplaceView("source", "id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 3, \"dep\": \"emp-id-3\" }");
        sql("MERGE INTO %s AS t USING source AS s ON 1 != 1 WHEN MATCHED THEN   UPDATE SET * WHEN NOT MATCHED THEN   INSERT *", new Object[]{this.tableName});
        createBranchIfNeeded();
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "emp-id-1"}), row(new Object[]{2, "emp-id-2"}), row(new Object[]{3, "emp-id-3"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithTableWithNonNullableColumn() {
        createAndInitTable("id INT NOT NULL, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        createOrReplaceView("source", "id INT NOT NULL, dep STRING", "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n{ \"id\": 1, \"dep\": \"emp-id-1\" }\n{ \"id\": 6, \"dep\": \"emp-id-6\" }");
        sql("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED AND t.id = 1 THEN   UPDATE SET * WHEN MATCHED AND t.id = 6 THEN   DELETE WHEN NOT MATCHED AND s.id = 2 THEN   INSERT *", new Object[]{commitTarget()});
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "emp-id-1"}), row(new Object[]{2, "emp-id-2"})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeWithNonExistingColumns() {
        createAndInitTable("id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>", "{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } } }");
        createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.invalid_col = s.c2", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("cannot resolve t.invalid_col in MERGE command");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.c.n2.invalid_col = s.c2", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("No such struct field `invalid_col`");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.c.n2.dn1 = s.c2 WHEN NOT MATCHED THEN   INSERT (id, invalid_col) VALUES (s.c1, null)", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("cannot resolve invalid_col in MERGE command");
    }

    @Test
    public void testMergeWithInvalidColumnsInInsert() {
        createAndInitTable("id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>", "{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } } }");
        createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.c.n2.dn1 = s.c2 WHEN NOT MATCHED THEN   INSERT (id, c.n2) VALUES (s.c1, null)", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("Nested fields are not supported inside INSERT clauses");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.c.n2.dn1 = s.c2 WHEN NOT MATCHED THEN   INSERT (id, id) VALUES (s.c1, null)", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("Duplicate column names inside INSERT clause");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN NOT MATCHED THEN   INSERT (id) VALUES (s.c1)", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("must provide values for all columns of the target table");
    }

    @Test
    public void testMergeWithInvalidUpdates() {
        createAndInitTable("id INT, a ARRAY<STRUCT<c1:INT,c2:INT>>, m MAP<STRING,STRING>", "{ \"id\": 1, \"a\": [ { \"c1\": 2, \"c2\": 3 } ], \"m\": { \"k\": \"v\"} }");
        createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.a.c1 = s.c2", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("Updating nested fields is only supported for structs");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.m.key = 'new_key'", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("Updating nested fields is only supported for structs");
    }

    @Test
    public void testMergeWithConflictingUpdates() {
        createAndInitTable("id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>", "{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } } }");
        createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.id = 1, t.c.n1 = 2, t.id = 2", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("Updates are in conflict");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.c.n1 = 1, t.id = 2, t.c.n1 = 2", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("Updates are in conflict for these columns");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET c.n1 = 1, c = named_struct('n1', 1, 'n2', named_struct('dn1', 1, 'dn2', 2))", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("Updates are in conflict");
    }

    @Test
    public void testMergeWithInvalidAssignments() {
        createAndInitTable("id INT NOT NULL, s STRUCT<n1:INT NOT NULL,n2:STRUCT<dn1:INT,dn2:INT>> NOT NULL", "{ \"id\": 1, \"s\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } } }");
        createOrReplaceView("source", "c1 INT, c2 STRUCT<n1:INT NOT NULL> NOT NULL, c3 STRING NOT NULL, c4 STRUCT<dn2:INT,dn1:INT>", "{ \"c1\": -100, \"c2\": { \"n1\" : 1 }, \"c3\" : 'str', \"c4\": { \"dn2\": 1, \"dn2\": 2 } }");
        for (String str : new String[]{"ansi", "strict"}) {
            withSQLConf(ImmutableMap.of("spark.sql.storeAssignmentPolicy", str), () -> {
                Assertions.assertThatThrownBy(() -> {
                    sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.id = NULL", new Object[]{commitTarget()});
                }).isInstanceOf(AnalysisException.class).hasMessageContaining("Cannot write nullable values to non-null column");
                Assertions.assertThatThrownBy(() -> {
                    sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.s.n1 = NULL", new Object[]{commitTarget()});
                }).isInstanceOf(AnalysisException.class).hasMessageContaining("Cannot write nullable values to non-null column");
                Assertions.assertThatThrownBy(() -> {
                    sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.s = s.c2", new Object[]{commitTarget()});
                }).isInstanceOf(AnalysisException.class).hasMessageContaining("missing fields");
                Assertions.assertThatThrownBy(() -> {
                    sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.s.n1 = s.c3", new Object[]{commitTarget()});
                }).isInstanceOf(AnalysisException.class).hasMessageEndingWith("Cannot safely cast 'n1': string to int");
                Assertions.assertThatThrownBy(() -> {
                    sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED THEN   UPDATE SET t.s.n2 = s.c4", new Object[]{commitTarget()});
                }).isInstanceOf(AnalysisException.class).hasMessageContaining("field name does not match");
            });
        }
    }

    @Test
    public void testMergeWithNonDeterministicConditions() {
        createAndInitTable("id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>", "{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } } }");
        createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 AND rand() > t.id WHEN MATCHED THEN   UPDATE SET t.c.n1 = -1", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("Non-deterministic functions are not supported in SEARCH conditions of MERGE operations");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED AND rand() > t.id THEN   UPDATE SET t.c.n1 = -1", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("Non-deterministic functions are not supported in UPDATE conditions of MERGE operations");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED AND rand() > t.id THEN   DELETE", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("Non-deterministic functions are not supported in DELETE conditions of MERGE operations");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN NOT MATCHED AND rand() > c1 THEN   INSERT (id, c) VALUES (1, null)", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("Non-deterministic functions are not supported in INSERT conditions of MERGE operations");
    }

    @Test
    public void testMergeWithAggregateExpressions() {
        createAndInitTable("id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>", "{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } } }");
        createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 AND max(t.id) == 1 WHEN MATCHED THEN   UPDATE SET t.c.n1 = -1", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("Agg functions are not supported in SEARCH conditions of MERGE operations");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED AND sum(t.id) < 1 THEN   UPDATE SET t.c.n1 = -1", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("Agg functions are not supported in UPDATE conditions of MERGE operations");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED AND sum(t.id) THEN   DELETE", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("Agg functions are not supported in DELETE conditions of MERGE operations");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN NOT MATCHED AND sum(c1) < 1 THEN   INSERT (id, c) VALUES (1, null)", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("Agg functions are not supported in INSERT conditions of MERGE operations");
    }

    @Test
    public void testMergeWithSubqueriesInConditions() {
        createAndInitTable("id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>", "{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } } }");
        createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 AND t.id < (SELECT max(c2) FROM source) WHEN MATCHED THEN   UPDATE SET t.c.n1 = s.c2", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("Subqueries are not supported in conditions of MERGE operations. Found a subquery in the SEARCH condition");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED AND t.id < (SELECT max(c2) FROM source) THEN   UPDATE SET t.c.n1 = s.c2", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("Subqueries are not supported in conditions of MERGE operations. Found a subquery in the UPDATE condition");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN MATCHED AND t.id NOT IN (SELECT c2 FROM source) THEN   DELETE", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("Subqueries are not supported in conditions of MERGE operations. Found a subquery in the DELETE condition");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.c1 WHEN NOT MATCHED AND s.c1 IN (SELECT c2 FROM source) THEN   INSERT (id, c) VALUES (1, null)", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("Subqueries are not supported in conditions of MERGE operations. Found a subquery in the INSERT condition");
    }

    @Test
    public void testMergeWithTargetColumnsInInsertConditions() {
        createAndInitTable("id INT, c2 INT", "{ \"id\": 1, \"c2\": 2 }");
        createOrReplaceView("source", "{ \"id\": 1, \"value\": 11 }");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.id WHEN NOT MATCHED AND c2 = 1 THEN   INSERT (id, c2) VALUES (s.id, null)", new Object[]{commitTarget()});
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("Cannot resolve [c2] in INSERT condition of MERGE operation");
    }

    @Test
    public void testMergeWithNonIcebergTargetTableNotSupported() {
        createOrReplaceView("target", "{ \"c1\": -100, \"c2\": -200 }");
        createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO target t USING source s ON t.c1 == s.c1 WHEN MATCHED THEN   UPDATE SET *", new Object[0]);
        }).isInstanceOf(UnsupportedOperationException.class).hasMessage("MERGE INTO TABLE is not supported temporarily.");
    }

    @Test
    public void testMergeSinglePartitionPartitioning() {
        createAndInitTable("id INT", "{\"id\": -1}");
        spark.range(0L, 5L).coalesce(1).createOrReplaceTempView("source");
        sql("MERGE INTO %s t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET *WHEN NOT MATCHED THEN INSERT *", new Object[]{commitTarget()});
        assertEquals("Should correctly add the non-matching rows", ImmutableList.of(row(new Object[]{-1}), row(new Object[]{0}), row(new Object[]{1}), row(new Object[]{2}), row(new Object[]{3}), row(new Object[]{4})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeEmptyTable() {
        Assume.assumeFalse("Custom branch does not exist for empty table", "test".equals(this.branch));
        createAndInitTable("id INT", null);
        spark.range(0L, 5L).coalesce(1).createOrReplaceTempView("source");
        sql("MERGE INTO %s t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET *WHEN NOT MATCHED THEN INSERT *", new Object[]{commitTarget()});
        assertEquals("Should correctly add the non-matching rows", ImmutableList.of(row(new Object[]{0}), row(new Object[]{1}), row(new Object[]{2}), row(new Object[]{3}), row(new Object[]{4})), sql("SELECT * FROM %s ORDER BY id", new Object[]{selectTarget()}));
    }

    @Test
    public void testMergeNonExistingBranch() {
        Assume.assumeTrue("Test only applicable to custom branch", "test".equals(this.branch));
        createAndInitTable("id INT", null);
        spark.range(0L, 5L).coalesce(1).createOrReplaceTempView("source");
        Assertions.assertThatThrownBy(() -> {
            sql("MERGE INTO %s t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET *WHEN NOT MATCHED THEN INSERT *", new Object[]{commitTarget()});
        }).isInstanceOf(ValidationException.class).hasMessage("Cannot use branch (does not exist): test");
    }

    @Test
    public void testMergeToWapBranch() {
        Assume.assumeTrue("WAP branch only works for table identifier without branch", this.branch == null);
        createAndInitTable("id INT", "{\"id\": -1}");
        ImmutableList of = ImmutableList.of(row(new Object[]{-1}));
        sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')", new Object[]{this.tableName, "write.wap.enabled"});
        spark.range(0L, 5L).coalesce(1).createOrReplaceTempView("source");
        ImmutableList of2 = ImmutableList.of(row(new Object[]{-1}), row(new Object[]{0}), row(new Object[]{1}), row(new Object[]{2}), row(new Object[]{3}), row(new Object[]{4}));
        withSQLConf(ImmutableMap.of("spark.wap.branch", "wap"), () -> {
            sql("MERGE INTO %s t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET *WHEN NOT MATCHED THEN INSERT *", new Object[]{this.tableName});
            assertEquals("Should have expected rows when reading table", of2, sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
            assertEquals("Should have expected rows when reading WAP branch", of2, sql("SELECT * FROM %s.branch_wap ORDER BY id", new Object[]{this.tableName}));
            assertEquals("Should not modify main branch", of, sql("SELECT * FROM %s.branch_main ORDER BY id", new Object[]{this.tableName}));
        });
        spark.range(3L, 6L).coalesce(1).createOrReplaceTempView("source2");
        ImmutableList of3 = ImmutableList.of(row(new Object[]{-1}), row(new Object[]{0}), row(new Object[]{1}), row(new Object[]{2}), row(new Object[]{5}));
        withSQLConf(ImmutableMap.of("spark.wap.branch", "wap"), () -> {
            sql("MERGE INTO %s t USING source2 s ON t.id = s.id WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT *", new Object[]{this.tableName});
            assertEquals("Should have expected rows when reading table with multiple writes", of3, sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
            assertEquals("Should have expected rows when reading WAP branch with multiple writes", of3, sql("SELECT * FROM %s.branch_wap ORDER BY id", new Object[]{this.tableName}));
            assertEquals("Should not modify main branch with multiple writes", of, sql("SELECT * FROM %s.branch_main ORDER BY id", new Object[]{this.tableName}));
        });
    }

    @Test
    public void testMergeToWapBranchWithTableBranchIdentifier() {
        Assume.assumeTrue("Test must have branch name part in table identifier", this.branch != null);
        createAndInitTable("id INT", "{\"id\": -1}");
        sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')", new Object[]{this.tableName, "write.wap.enabled"});
        spark.range(0L, 5L).coalesce(1).createOrReplaceTempView("source");
        ImmutableList.of(row(new Object[]{-1}), row(new Object[]{0}), row(new Object[]{1}), row(new Object[]{2}), row(new Object[]{3}), row(new Object[]{4}));
        withSQLConf(ImmutableMap.of("spark.wap.branch", "wap"), () -> {
            Assertions.assertThatThrownBy(() -> {
                sql("MERGE INTO %s t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET *WHEN NOT MATCHED THEN INSERT *", new Object[]{commitTarget()});
            }).isInstanceOf(ValidationException.class).hasMessage(String.format("Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [wap]", this.branch));
        });
    }

    private void checkJoinAndFilterConditions(String str, String str2, String str3) {
        withSQLConf(ImmutableMap.of(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED().key(), "false"), () -> {
            String replaceAll = executeAndKeepPlan(() -> {
                sql(str, new Object[0]);
            }).toString().replaceAll("#(\\d+L?)", "");
            ((AbstractStringAssert) Assertions.assertThat(replaceAll).as("Join should match", new Object[0])).contains(new CharSequence[]{str2 + "\n"});
            ((AbstractStringAssert) Assertions.assertThat(replaceAll).as("Pushed filters must match", new Object[0])).contains(new CharSequence[]{"[filters=" + str3 + ","});
        });
    }

    private RowLevelOperationMode mode(Table table) {
        return RowLevelOperationMode.fromName((String) table.properties().getOrDefault("write.merge.mode", TableProperties.MERGE_MODE_DEFAULT));
    }
}
