package org.apache.iceberg.spark.extensions;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.internal.SQLConf;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;

/* loaded from: input_file:org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.class */
public class TestCopyOnWriteMerge extends TestMerge {
    public TestCopyOnWriteMerge(String str, String str2, Map<String, String> map, String str3, boolean z, String str4, String str5) {
        super(str, str2, map, str3, z, str4, str5);
    }

    @Override // org.apache.iceberg.spark.extensions.SparkRowLevelOperationsTestBase
    protected Map<String, String> extraTableProperties() {
        return ImmutableMap.of("write.merge.mode", RowLevelOperationMode.COPY_ON_WRITE.modeName());
    }

    @Test
    public synchronized void testMergeWithConcurrentTableRefresh() throws Exception {
        Assume.assumeTrue(this.catalogName.equalsIgnoreCase("testhive"));
        createAndInitTable("id INT, dep STRING");
        createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
        sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", new Object[]{this.tableName, "write.merge.isolation-level", "snapshot"});
        sql("INSERT INTO TABLE %s VALUES (1, 'hr')", new Object[]{this.tableName});
        createBranchIfNeeded();
        Table loadIcebergTable = Spark3Util.loadIcebergTable(spark, this.tableName);
        ExecutorService exitingExecutorService = MoreExecutors.getExitingExecutorService((ThreadPoolExecutor) Executors.newFixedThreadPool(2));
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        Future<?> submit = exitingExecutorService.submit(() -> {
            for (int i = 0; i < Integer.MAX_VALUE; i++) {
                while (atomicInteger.get() < i * 2) {
                    sleep(10L);
                }
                sql("MERGE INTO %s t USING source s ON t.id == s.value WHEN MATCHED THEN   UPDATE SET dep = 'x'", new Object[]{this.tableName});
                atomicInteger.incrementAndGet();
            }
        });
        Future<?> submit2 = exitingExecutorService.submit(() -> {
            GenericRecord create = GenericRecord.create(loadIcebergTable.schema());
            create.set(0, 1);
            create.set(1, "hr");
            for (int i = 0; i < Integer.MAX_VALUE; i++) {
                while (atomicBoolean.get() && atomicInteger.get() < i * 2) {
                    sleep(10L);
                }
                if (!atomicBoolean.get()) {
                    return;
                }
                for (int i2 = 0; i2 < 5; i2++) {
                    loadIcebergTable.newFastAppend().appendFile(writeDataFile(loadIcebergTable, ImmutableList.of(create))).commit();
                    sleep(10L);
                }
                atomicInteger.incrementAndGet();
            }
        });
        try {
            Objects.requireNonNull(submit);
            Assertions.assertThatThrownBy(submit::get).isInstanceOf(ExecutionException.class).cause().isInstanceOf(IllegalStateException.class).hasMessageContaining("the table has been concurrently modified");
            atomicBoolean.set(false);
            submit2.cancel(true);
            exitingExecutorService.shutdown();
            Assert.assertTrue("Timeout", exitingExecutorService.awaitTermination(2L, TimeUnit.MINUTES));
        } catch (Throwable th) {
            atomicBoolean.set(false);
            submit2.cancel(true);
            throw th;
        }
    }

    @Test
    public void testRuntimeFilteringWithReportedPartitioning() {
        createAndInitTable("id INT, dep STRING");
        sql("ALTER TABLE %s ADD PARTITION FIELD dep", new Object[]{this.tableName});
        append(this.tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n{ \"id\": 3, \"dep\": \"hr\" }");
        createBranchIfNeeded();
        append(commitTarget(), "{ \"id\": 1, \"dep\": \"hardware\" }\n{ \"id\": 2, \"dep\": \"hardware\" }");
        createOrReplaceView("source", Collections.singletonList(2), Encoders.INT());
        withSQLConf(ImmutableMap.of(SQLConf.V2_BUCKETING_ENABLED().key(), "true", "spark.sql.iceberg.planning.preserve-data-grouping", "true"), () -> {
            sql("MERGE INTO %s t USING source s ON t.id == s.value WHEN MATCHED THEN   UPDATE SET id = -1", new Object[]{commitTarget()});
        });
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Assert.assertEquals("Should have 3 snapshots", 3L, Iterables.size(loadTable.snapshots()));
        validateCopyOnWrite(SnapshotUtil.latestSnapshot(loadTable, this.branch), "1", "1", "1");
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{-1, "hardware"}), row(new Object[]{1, "hardware"}), row(new Object[]{1, "hr"}), row(new Object[]{3, "hr"})), sql("SELECT * FROM %s ORDER BY id, dep", new Object[]{selectTarget()}));
    }
}
