package org.apache.iceberg.flink;

import java.time.LocalDate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/iceberg/flink/TestFlinkUpsert.class */
public class TestFlinkUpsert extends FlinkCatalogTestBase {

    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = MiniClusterResource.createWithClassloaderCheckDisabled();

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private final boolean isStreamingJob;
    private final Map<String, String> tableUpsertProps;
    private TableEnvironment tEnv;

    public TestFlinkUpsert(String str, Namespace namespace, FileFormat fileFormat, Boolean bool) {
        super(str, namespace);
        this.tableUpsertProps = Maps.newHashMap();
        this.isStreamingJob = bool.booleanValue();
        this.tableUpsertProps.put("format-version", "2");
        this.tableUpsertProps.put("write.upsert.enabled", "true");
        this.tableUpsertProps.put("write.format.default", fileFormat.name());
    }

    @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}")
    public static Iterable<Object[]> parameters() {
        ArrayList newArrayList = Lists.newArrayList();
        for (FileFormat fileFormat : new FileFormat[]{FileFormat.PARQUET, FileFormat.AVRO, FileFormat.ORC}) {
            for (Boolean bool : new Boolean[]{true, false}) {
                newArrayList.add(new Object[]{"testhadoop", Namespace.of(new String[]{TestFixtures.DATABASE}), fileFormat, bool});
            }
        }
        return newArrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iceberg.flink.FlinkTestBase
    public TableEnvironment getTableEnv() {
        if (this.tEnv == null) {
            synchronized (this) {
                EnvironmentSettings.Builder newInstance = EnvironmentSettings.newInstance();
                if (this.isStreamingJob) {
                    newInstance.inStreamingMode();
                    StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
                    executionEnvironment.enableCheckpointing(400L);
                    executionEnvironment.setMaxParallelism(2);
                    executionEnvironment.setParallelism(2);
                    this.tEnv = StreamTableEnvironment.create(executionEnvironment, newInstance.build());
                } else {
                    newInstance.inBatchMode();
                    this.tEnv = TableEnvironment.create(newInstance.build());
                }
            }
        }
        return this.tEnv;
    }

    @Override // org.apache.iceberg.flink.FlinkCatalogTestBase
    @Before
    public void before() {
        super.before();
        sql("CREATE DATABASE IF NOT EXISTS %s", this.flinkDatabase);
        sql("USE CATALOG %s", this.catalogName);
        sql("USE %s", "db");
    }

    @Override // org.apache.iceberg.flink.FlinkCatalogTestBase
    @After
    public void clean() {
        sql("DROP DATABASE IF EXISTS %s", this.flinkDatabase);
        super.clean();
    }

    @Test
    public void testUpsertAndQuery() {
        LocalDate of = LocalDate.of(2022, 3, 1);
        LocalDate of2 = LocalDate.of(2022, 3, 2);
        sql("CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, dt DATE, PRIMARY KEY(id,dt) NOT ENFORCED) PARTITIONED BY (dt) WITH %s", "test_upsert_query", toWithClause(this.tableUpsertProps));
        try {
            sql("INSERT INTO %s VALUES (1, 'Bill', DATE '2022-03-01'),(1, 'Jane', DATE '2022-03-01'),(2, 'Jane', DATE '2022-03-01')", "test_upsert_query");
            sql("INSERT INTO %s VALUES (2, 'Bill', DATE '2022-03-01'),(1, 'Jane', DATE '2022-03-02'),(2, 'Jane', DATE '2022-03-02')", "test_upsert_query");
            ArrayList newArrayList = Lists.newArrayList(new Row[]{Row.of(new Object[]{1, "Jane", of}), Row.of(new Object[]{2, "Bill", of})});
            TestHelpers.assertRows(sql("SELECT * FROM %s WHERE dt < '2022-03-02'", "test_upsert_query"), newArrayList);
            ArrayList newArrayList2 = Lists.newArrayList(new Row[]{Row.of(new Object[]{1, "Jane", of2}), Row.of(new Object[]{2, "Jane", of2})});
            TestHelpers.assertRows(sql("SELECT * FROM %s WHERE dt = '2022-03-02'", "test_upsert_query"), newArrayList2);
            TestHelpers.assertRows(sql("SELECT * FROM %s", "test_upsert_query"), Lists.newArrayList(Iterables.concat(newArrayList, newArrayList2)));
            sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, "test_upsert_query");
        } catch (Throwable th) {
            sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, "test_upsert_query");
            throw th;
        }
    }

    @Test
    public void testUpsertOptions() {
        LocalDate of = LocalDate.of(2022, 3, 1);
        LocalDate of2 = LocalDate.of(2022, 3, 2);
        HashMap newHashMap = Maps.newHashMap(this.tableUpsertProps);
        newHashMap.remove("write.upsert.enabled");
        sql("CREATE TABLE %s(id INT NOT NULL, province STRING NOT NULL, dt DATE, PRIMARY KEY(id,province) NOT ENFORCED) PARTITIONED BY (province) WITH %s", "test_upsert_options", toWithClause(newHashMap));
        try {
            sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/  VALUES (1, 'a', DATE '2022-03-01'),(2, 'b', DATE '2022-03-01'),(1, 'b', DATE '2022-03-01')", "test_upsert_options");
            sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/  VALUES (4, 'a', DATE '2022-03-02'),(5, 'b', DATE '2022-03-02'),(1, 'b', DATE '2022-03-02')", "test_upsert_options");
            ArrayList newArrayList = Lists.newArrayList(new Row[]{Row.of(new Object[]{2, "b", of}), Row.of(new Object[]{1, "a", of})});
            TestHelpers.assertRows(sql("SELECT * FROM %s WHERE dt < '2022-03-02'", "test_upsert_options"), newArrayList);
            ArrayList newArrayList2 = Lists.newArrayList(new Row[]{Row.of(new Object[]{1, "b", of2}), Row.of(new Object[]{4, "a", of2}), Row.of(new Object[]{5, "b", of2})});
            TestHelpers.assertRows(sql("SELECT * FROM %s WHERE dt = '2022-03-02'", "test_upsert_options"), newArrayList2);
            TestHelpers.assertRows(sql("SELECT * FROM %s", "test_upsert_options"), Lists.newArrayList(Iterables.concat(newArrayList, newArrayList2)));
            sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, "test_upsert_options");
        } catch (Throwable th) {
            sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, "test_upsert_options");
            throw th;
        }
    }

    @Test
    public void testPrimaryKeyEqualToPartitionKey() {
        try {
            sql("CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, PRIMARY KEY(id) NOT ENFORCED) PARTITIONED BY (id) WITH %s", "upsert_on_id_key", toWithClause(this.tableUpsertProps));
            sql("INSERT INTO %s VALUES (1, 'Bill'),(1, 'Jane'),(2, 'Bill')", "upsert_on_id_key");
            TestHelpers.assertRows(sql("SELECT * FROM %s", "upsert_on_id_key"), Lists.newArrayList(new Row[]{Row.of(new Object[]{1, "Jane"}), Row.of(new Object[]{2, "Bill"})}));
            sql("INSERT INTO %s VALUES (1, 'Bill'),(2, 'Jane')", "upsert_on_id_key");
            TestHelpers.assertRows(sql("SELECT * FROM %s", "upsert_on_id_key"), Lists.newArrayList(new Row[]{Row.of(new Object[]{1, "Bill"}), Row.of(new Object[]{2, "Jane"})}));
            sql("INSERT INTO %s VALUES (3, 'Bill'),(4, 'Jane')", "upsert_on_id_key");
            TestHelpers.assertRows(sql("SELECT * FROM %s", "upsert_on_id_key"), Lists.newArrayList(new Row[]{Row.of(new Object[]{1, "Bill"}), Row.of(new Object[]{2, "Jane"}), Row.of(new Object[]{3, "Bill"}), Row.of(new Object[]{4, "Jane"})}));
            sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, "upsert_on_id_key");
        } catch (Throwable th) {
            sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, "upsert_on_id_key");
            throw th;
        }
    }

    @Test
    public void testPrimaryKeyFieldsAtBeginningOfSchema() {
        LocalDate of = LocalDate.of(2022, 3, 1);
        try {
            sql("CREATE TABLE %s( id INT, dt DATE NOT NULL, name STRING NOT NULL, PRIMARY KEY(id,dt) NOT ENFORCED) PARTITIONED BY (dt) WITH %s", "upsert_on_pk_at_schema_start", toWithClause(this.tableUpsertProps));
            sql("INSERT INTO %s VALUES (1, DATE '2022-03-01', 'Andy'),(1, DATE '2022-03-01', 'Bill'),(2, DATE '2022-03-01', 'Jane')", "upsert_on_pk_at_schema_start");
            TestHelpers.assertRows(sql("SELECT * FROM %s", "upsert_on_pk_at_schema_start"), Lists.newArrayList(new Row[]{Row.of(new Object[]{1, of, "Bill"}), Row.of(new Object[]{2, of, "Jane"})}));
            sql("INSERT INTO %s VALUES (1, DATE '2022-03-01', 'Jane'),(2, DATE '2022-03-01', 'Bill')", "upsert_on_pk_at_schema_start");
            TestHelpers.assertRows(sql("SELECT * FROM %s", "upsert_on_pk_at_schema_start"), Lists.newArrayList(new Row[]{Row.of(new Object[]{1, of, "Jane"}), Row.of(new Object[]{2, of, "Bill"})}));
            sql("INSERT INTO %s VALUES (3, DATE '2022-03-01', 'Duke'),(4, DATE '2022-03-01', 'Leon')", "upsert_on_pk_at_schema_start");
            TestHelpers.assertRows(sql("SELECT * FROM %s", "upsert_on_pk_at_schema_start"), Lists.newArrayList(new Row[]{Row.of(new Object[]{1, of, "Jane"}), Row.of(new Object[]{2, of, "Bill"}), Row.of(new Object[]{3, of, "Duke"}), Row.of(new Object[]{4, of, "Leon"})}));
            sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, "upsert_on_pk_at_schema_start");
        } catch (Throwable th) {
            sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, "upsert_on_pk_at_schema_start");
            throw th;
        }
    }

    @Test
    public void testPrimaryKeyFieldsAtEndOfTableSchema() {
        LocalDate of = LocalDate.of(2022, 3, 1);
        try {
            sql("CREATE TABLE %s(name STRING NOT NULL, id INT, dt DATE NOT NULL, PRIMARY KEY(id,dt) NOT ENFORCED) PARTITIONED BY (dt) WITH %s", "upsert_on_pk_at_schema_end", toWithClause(this.tableUpsertProps));
            sql("INSERT INTO %s VALUES ('Andy', 1, DATE '2022-03-01'),('Bill', 1, DATE '2022-03-01'),('Jane', 2, DATE '2022-03-01')", "upsert_on_pk_at_schema_end");
            TestHelpers.assertRows(sql("SELECT * FROM %s", "upsert_on_pk_at_schema_end"), Lists.newArrayList(new Row[]{Row.of(new Object[]{"Bill", 1, of}), Row.of(new Object[]{"Jane", 2, of})}));
            sql("INSERT INTO %s VALUES ('Jane', 1, DATE '2022-03-01'),('Bill', 2, DATE '2022-03-01')", "upsert_on_pk_at_schema_end");
            TestHelpers.assertRows(sql("SELECT * FROM %s", "upsert_on_pk_at_schema_end"), Lists.newArrayList(new Row[]{Row.of(new Object[]{"Jane", 1, of}), Row.of(new Object[]{"Bill", 2, of})}));
            sql("INSERT INTO %s VALUES ('Duke', 3, DATE '2022-03-01'),('Leon', 4, DATE '2022-03-01')", "upsert_on_pk_at_schema_end");
            TestHelpers.assertRows(sql("SELECT * FROM %s", "upsert_on_pk_at_schema_end"), Lists.newArrayList(new Row[]{Row.of(new Object[]{"Jane", 1, of}), Row.of(new Object[]{"Bill", 2, of}), Row.of(new Object[]{"Duke", 3, of}), Row.of(new Object[]{"Leon", 4, of})}));
            sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, "upsert_on_pk_at_schema_end");
        } catch (Throwable th) {
            sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, "upsert_on_pk_at_schema_end");
            throw th;
        }
    }
}
