package org.apache.beam.sdk.io.iceberg;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.commons.compress.utils.Lists;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.parquet.Parquet;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.class */
public class IcebergIOWriteTest implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(IcebergIOWriteTest.class);

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

    @Rule
    public transient TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default");

    @Rule
    public transient TestPipeline testPipeline = TestPipeline.create();

    @Test
    public void testSimpleAppend() throws Exception {
        TableIdentifier of = TableIdentifier.of(new String[]{"default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)});
        Table createTable = this.warehouse.createTable(of, TestFixtures.SCHEMA);
        Properties properties = new Properties();
        properties.setProperty("type", "hadoop");
        properties.setProperty("warehouse", this.warehouse.location);
        this.testPipeline.apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))).setRowSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)).apply("Append To Table", IcebergIO.writeRows(IcebergCatalogConfig.builder().setCatalogName("name").setProperties(properties).build()).to(of));
        LOG.info("Executing pipeline");
        this.testPipeline.run().waitUntilFinish();
        LOG.info("Done running pipeline");
        MatcherAssert.assertThat(ImmutableList.copyOf(IcebergGenerics.read(createTable).build()), Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray()));
    }

    @Test
    public void testDynamicDestinationsWithoutSpillover() throws Exception {
        final String l = Long.toString(UUID.randomUUID().hashCode(), 16);
        TableIdentifier of = TableIdentifier.of(new String[]{"default", "table1-" + l});
        TableIdentifier of2 = TableIdentifier.of(new String[]{"default", "table2-" + l});
        TableIdentifier of3 = TableIdentifier.of(new String[]{"default", "table3-" + l});
        Table createTable = this.warehouse.createTable(of, TestFixtures.SCHEMA);
        Table createTable2 = this.warehouse.createTable(of2, TestFixtures.SCHEMA);
        Table createTable3 = this.warehouse.createTable(of3, TestFixtures.SCHEMA);
        Properties properties = new Properties();
        properties.setProperty("type", "hadoop");
        properties.setProperty("warehouse", this.warehouse.location);
        this.testPipeline.apply("Records To Add", Create.of(TestFixtures.asRows(Iterables.concat(TestFixtures.FILE1SNAPSHOT1, TestFixtures.FILE1SNAPSHOT2, TestFixtures.FILE1SNAPSHOT3)))).setRowSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)).apply("Append To Table", IcebergIO.writeRows(IcebergCatalogConfig.builder().setCatalogName("name").setProperties(properties).build()).to(new DynamicDestinations() { // from class: org.apache.beam.sdk.io.iceberg.IcebergIOWriteTest.1
            private final Schema schema = Schema.builder().addInt64Field("tableNumber").build();

            public Schema getMetadataSchema() {
                return this.schema;
            }

            public Row assignDestinationMetadata(Row row) {
                return Row.withSchema(this.schema).addValues(new Object[]{Long.valueOf((row.getInt64("id").longValue() / 3) + 1)}).build();
            }

            public IcebergDestination instantiateDestination(Row row) {
                return IcebergDestination.builder().setTableIdentifier(TableIdentifier.of(new String[]{"default", "table" + row.getInt64("tableNumber") + "-" + l})).setFileFormat(FileFormat.PARQUET).build();
            }
        }));
        LOG.info("Executing pipeline");
        this.testPipeline.run().waitUntilFinish();
        LOG.info("Done running pipeline");
        ImmutableList copyOf = ImmutableList.copyOf(IcebergGenerics.read(createTable).build());
        ImmutableList copyOf2 = ImmutableList.copyOf(IcebergGenerics.read(createTable2).build());
        ImmutableList copyOf3 = ImmutableList.copyOf(IcebergGenerics.read(createTable3).build());
        MatcherAssert.assertThat(copyOf, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray()));
        MatcherAssert.assertThat(copyOf2, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT2.toArray()));
        MatcherAssert.assertThat(copyOf3, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT3.toArray()));
    }

    @Test
    public void testDynamicDestinationsWithSpillover() throws Exception {
        final String l = Long.toString(UUID.randomUUID().hashCode(), 16);
        final int i = 100;
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        for (int i2 = 0; i2 < 100; i2++) {
            TableIdentifier of = TableIdentifier.of(new String[]{"default", "table" + i2 + "-" + l});
            newArrayList.add(of);
            newArrayList2.add(this.warehouse.createTable(of, TestFixtures.SCHEMA));
        }
        int i3 = 10 * 100;
        ArrayList newArrayList3 = Lists.newArrayList();
        GenericRecord create = GenericRecord.create(TestFixtures.SCHEMA);
        HashMap newHashMap = Maps.newHashMap();
        for (int i4 = 0; i4 < i3; i4++) {
            Record copy = create.copy(ImmutableMap.of("id", Long.valueOf(i4), "data", "data for " + i4));
            TableIdentifier tableIdentifier = (TableIdentifier) newArrayList.get(i4 % 100);
            newArrayList3.add(copy);
            ((List) newHashMap.computeIfAbsent(tableIdentifier, tableIdentifier2 -> {
                return Lists.newArrayList();
            })).add(copy);
        }
        Properties properties = new Properties();
        properties.setProperty("type", "hadoop");
        properties.setProperty("warehouse", this.warehouse.location);
        this.testPipeline.apply("Records To Add", Create.of(TestFixtures.asRows(newArrayList3))).setRowSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)).apply("Append To Table", IcebergIO.writeRows(IcebergCatalogConfig.builder().setCatalogName("name").setProperties(properties).build()).to(new DynamicDestinations() { // from class: org.apache.beam.sdk.io.iceberg.IcebergIOWriteTest.2
            private final Schema schema = Schema.builder().addInt64Field("tableNumber").build();

            public Schema getMetadataSchema() {
                return this.schema;
            }

            public Row assignDestinationMetadata(Row row) {
                return Row.withSchema(this.schema).addValues(new Object[]{Long.valueOf(row.getInt64("id").longValue() % i)}).build();
            }

            public IcebergDestination instantiateDestination(Row row) {
                return IcebergDestination.builder().setTableIdentifier(TableIdentifier.of(new String[]{"default", "table" + row.getInt64("tableNumber") + "-" + l})).setFileFormat(FileFormat.PARQUET).build();
            }
        }));
        LOG.info("Executing pipeline");
        this.testPipeline.run().waitUntilFinish();
        LOG.info("Done running pipeline");
        for (int i5 = 0; i5 < 100; i5++) {
            MatcherAssert.assertThat(ImmutableList.copyOf(IcebergGenerics.read((Table) newArrayList2.get(i5)).build()), Matchers.containsInAnyOrder(((List) newHashMap.get((TableIdentifier) newArrayList.get(i5))).toArray()));
        }
    }

    @Test
    public void testIdempotentCommit() throws Exception {
        Table createTable = this.warehouse.createTable(TableIdentifier.of(new String[]{"default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)}), TestFixtures.SCHEMA);
        Record rowToRecord = SchemaAndRowConversions.rowToRecord(createTable.schema(), Row.withSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)).addValues(new Object[]{42L, "bizzle"}).build());
        DataWriter build = Parquet.writeData(createTable.io().newOutputFile(TEMPORARY_FOLDER.newFile().toString())).createWriterFunc(GenericParquetWriter::buildWriter).schema(createTable.schema()).withSpec(createTable.spec()).overwrite().build();
        build.write(rowToRecord);
        build.close();
        DataFile dataFile = build.toDataFile();
        AppendFiles newAppend = createTable.newAppend();
        newAppend.appendFile(dataFile);
        newAppend.commit();
        AppendFiles newAppend2 = createTable.newAppend();
        newAppend2.appendFile(dataFile);
        newAppend2.commit();
    }
}
