package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.HadoopCatalogResource;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/iceberg/flink/sink/TestFlinkIcebergSink.class */
public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {

    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = MiniClusterResource.createWithClassloaderCheckDisabled();

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

    @Rule
    public final HadoopCatalogResource catalogResource = new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
    private TableLoader tableLoader;
    private final FileFormat format;
    private final int parallelism;
    private final boolean partitioned;

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}")
    public static Object[][] parameters() {
        return new Object[]{new Object[]{"avro", 1, true}, new Object[]{"avro", 1, false}, new Object[]{"avro", 2, true}, new Object[]{"avro", 2, false}, new Object[]{"orc", 1, true}, new Object[]{"orc", 1, false}, new Object[]{"orc", 2, true}, new Object[]{"orc", 2, false}, new Object[]{"parquet", 1, true}, new Object[]{"parquet", 1, false}, new Object[]{"parquet", 2, true}, new Object[]{"parquet", 2, false}};
    }

    public TestFlinkIcebergSink(String str, int i, boolean z) {
        this.format = FileFormat.fromString(str);
        this.parallelism = i;
        this.partitioned = z;
    }

    @Before
    public void before() throws IOException {
        this.table = this.catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SimpleDataUtil.SCHEMA, this.partitioned ? PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build() : PartitionSpec.unpartitioned(), ImmutableMap.of("write.format.default", this.format.name()));
        this.env = StreamExecutionEnvironment.getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG).enableCheckpointing(100L).setParallelism(this.parallelism).setMaxParallelism(this.parallelism);
        this.tableLoader = this.catalogResource.tableLoader();
    }

    @Test
    public void testWriteRowData() throws Exception {
        ArrayList newArrayList = Lists.newArrayList(new Row[]{Row.of(new Object[]{1, "hello"}), Row.of(new Object[]{2, "world"}), Row.of(new Object[]{3, "foo"})});
        DataStreamSource addSource = this.env.addSource(createBoundedSource(newArrayList), ROW_TYPE_INFO);
        DataFormatConverters.RowConverter rowConverter = CONVERTER;
        Objects.requireNonNull(rowConverter);
        FlinkSink.forRowData(addSource.map((v1) -> {
            return r1.toInternal(v1);
        }, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE))).table(this.table).tableLoader(this.tableLoader).writeParallelism(this.parallelism).append();
        this.env.execute("Test Iceberg DataStream");
        SimpleDataUtil.assertTableRows(this.table, convertToRowData(newArrayList));
    }

    private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode) throws Exception {
        List<Row> createRows = createRows("");
        FlinkSink.forRow(this.env.addSource(createBoundedSource(createRows), ROW_TYPE_INFO), SimpleDataUtil.FLINK_SCHEMA).table(this.table).tableLoader(this.tableLoader).tableSchema(tableSchema).writeParallelism(this.parallelism).distributionMode(distributionMode).append();
        this.env.execute("Test Iceberg DataStream.");
        SimpleDataUtil.assertTableRows(this.table, convertToRowData(createRows));
    }

    private int partitionFiles(String str) throws IOException {
        return SimpleDataUtil.partitionDataFiles(this.table, ImmutableMap.of("data", str)).size();
    }

    @Test
    public void testWriteRow() throws Exception {
        testWriteRow(null, DistributionMode.NONE);
    }

    @Test
    public void testWriteRowWithTableSchema() throws Exception {
        testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE);
    }

    @Test
    public void testJobNoneDistributeMode() throws Exception {
        this.table.updateProperties().set("write.distribution-mode", DistributionMode.HASH.modeName()).commit();
        testWriteRow(null, DistributionMode.NONE);
        if (this.parallelism <= 1 || !this.partitioned) {
            return;
        }
        Assert.assertTrue("Should have more than 3 files in iceberg table.", (partitionFiles("aaa") + partitionFiles("bbb")) + partitionFiles("ccc") > 3);
    }

    @Test
    public void testJobHashDistributionMode() {
        this.table.updateProperties().set("write.distribution-mode", DistributionMode.HASH.modeName()).commit();
        AssertHelpers.assertThrows("Does not support range distribution-mode now.", IllegalArgumentException.class, "Flink does not support 'range' write distribution mode now.", () -> {
            testWriteRow(null, DistributionMode.RANGE);
            return null;
        });
    }

    @Test
    public void testJobNullDistributionMode() throws Exception {
        this.table.updateProperties().set("write.distribution-mode", DistributionMode.HASH.modeName()).commit();
        testWriteRow(null, null);
        if (this.partitioned) {
            Assert.assertEquals("There should be only 1 data file in partition 'aaa'", 1L, partitionFiles("aaa"));
            Assert.assertEquals("There should be only 1 data file in partition 'bbb'", 1L, partitionFiles("bbb"));
            Assert.assertEquals("There should be only 1 data file in partition 'ccc'", 1L, partitionFiles("ccc"));
        }
    }

    @Test
    public void testPartitionWriteMode() throws Exception {
        testWriteRow(null, DistributionMode.HASH);
        if (this.partitioned) {
            Assert.assertEquals("There should be only 1 data file in partition 'aaa'", 1L, partitionFiles("aaa"));
            Assert.assertEquals("There should be only 1 data file in partition 'bbb'", 1L, partitionFiles("bbb"));
            Assert.assertEquals("There should be only 1 data file in partition 'ccc'", 1L, partitionFiles("ccc"));
        }
    }

    @Test
    public void testShuffleByPartitionWithSchema() throws Exception {
        testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.HASH);
        if (this.partitioned) {
            Assert.assertEquals("There should be only 1 data file in partition 'aaa'", 1L, partitionFiles("aaa"));
            Assert.assertEquals("There should be only 1 data file in partition 'bbb'", 1L, partitionFiles("bbb"));
            Assert.assertEquals("There should be only 1 data file in partition 'ccc'", 1L, partitionFiles("ccc"));
        }
    }

    @Test
    public void testTwoSinksInDisjointedDAG() throws Exception {
        ImmutableMap of = ImmutableMap.of("write.format.default", this.format.name());
        Table createTable = this.catalogResource.catalog().createTable(TableIdentifier.of(new String[]{"left"}), SimpleDataUtil.SCHEMA, this.partitioned ? PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build() : PartitionSpec.unpartitioned(), of);
        TableLoader fromCatalog = TableLoader.fromCatalog(this.catalogResource.catalogLoader(), TableIdentifier.of(new String[]{"left"}));
        Table createTable2 = this.catalogResource.catalog().createTable(TableIdentifier.of(new String[]{"right"}), SimpleDataUtil.SCHEMA, this.partitioned ? PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build() : PartitionSpec.unpartitioned(), of);
        TableLoader fromCatalog2 = TableLoader.fromCatalog(this.catalogResource.catalogLoader(), TableIdentifier.of(new String[]{"right"}));
        this.env = StreamExecutionEnvironment.getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG).enableCheckpointing(100L).setParallelism(this.parallelism).setMaxParallelism(this.parallelism);
        this.env.getConfig().disableAutoGeneratedUIDs();
        List<Row> createRows = createRows("left-");
        FlinkSink.forRow(this.env.fromCollection(createRows, ROW_TYPE_INFO).name("leftCustomSource").uid("leftCustomSource"), SimpleDataUtil.FLINK_SCHEMA).table(createTable).tableLoader(fromCatalog).tableSchema(SimpleDataUtil.FLINK_SCHEMA).distributionMode(DistributionMode.NONE).uidPrefix("leftIcebergSink").append();
        List<Row> createRows2 = createRows("right-");
        FlinkSink.forRow(this.env.fromCollection(createRows2, ROW_TYPE_INFO).name("rightCustomSource").uid("rightCustomSource"), SimpleDataUtil.FLINK_SCHEMA).table(createTable2).tableLoader(fromCatalog2).tableSchema(SimpleDataUtil.FLINK_SCHEMA).writeParallelism(this.parallelism).distributionMode(DistributionMode.HASH).uidPrefix("rightIcebergSink").setSnapshotProperty("flink.test", TestFlinkIcebergSink.class.getName()).setSnapshotProperties(Collections.singletonMap("direction", "rightTable")).append();
        this.env.execute("Test Iceberg DataStream.");
        SimpleDataUtil.assertTableRows(createTable, convertToRowData(createRows));
        SimpleDataUtil.assertTableRows(createTable2, convertToRowData(createRows2));
        createTable.refresh();
        Assert.assertNull(createTable.currentSnapshot().summary().get("flink.test"));
        Assert.assertNull(createTable.currentSnapshot().summary().get("direction"));
        createTable2.refresh();
        Assert.assertEquals(TestFlinkIcebergSink.class.getName(), createTable2.currentSnapshot().summary().get("flink.test"));
        Assert.assertEquals("rightTable", createTable2.currentSnapshot().summary().get("direction"));
    }

    @Test
    public void testOverrideWriteConfigWithUnknownDistributionMode() {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), "UNRECOGNIZED");
        FlinkSink.Builder all = FlinkSink.forRow(this.env.addSource(createBoundedSource(createRows("")), ROW_TYPE_INFO), SimpleDataUtil.FLINK_SCHEMA).table(this.table).tableLoader(this.tableLoader).writeParallelism(this.parallelism).setAll(newHashMap);
        AssertHelpers.assertThrows("Should fail with invalid distribution mode.", IllegalArgumentException.class, "Invalid distribution mode: UNRECOGNIZED", () -> {
            all.append();
            this.env.execute("Test Iceberg DataStream.");
            return null;
        });
    }

    @Test
    public void testOverrideWriteConfigWithUnknownFileFormat() {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put(FlinkWriteOptions.WRITE_FORMAT.key(), "UNRECOGNIZED");
        FlinkSink.Builder all = FlinkSink.forRow(this.env.addSource(createBoundedSource(createRows("")), ROW_TYPE_INFO), SimpleDataUtil.FLINK_SCHEMA).table(this.table).tableLoader(this.tableLoader).writeParallelism(this.parallelism).setAll(newHashMap);
        AssertHelpers.assertThrows("Should fail with invalid file format.", IllegalArgumentException.class, "Invalid file format: UNRECOGNIZED", () -> {
            all.append();
            this.env.execute("Test Iceberg DataStream.");
            return null;
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 550342232:
                if (implMethodName.equals("toInternal")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/table/data/util/DataFormatConverters$DataFormatConverter") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    DataFormatConverters.RowConverter rowConverter = (DataFormatConverters.RowConverter) serializedLambda.getCapturedArg(0);
                    return (v1) -> {
                        return r0.toInternal(v1);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
