package org.apache.beam.sdk.io.iceberg;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.encryption.InputFilesDecryptor;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.parquet.Parquet;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

/* loaded from: input_file:org/apache/beam/sdk/io/iceberg/BigQueryMetastoreCatalogIT.class */
public class BigQueryMetastoreCatalogIT {

    @Rule
    public TestPipeline writePipeline = TestPipeline.create();

    @Rule
    public TestPipeline readPipeline = TestPipeline.create();

    @Rule
    public TestName testName = new TestName();
    private static Catalog catalog;
    private static Map<String, String> catalogProps;
    private TableIdentifier tableIdentifier;
    private static final Schema DOUBLY_NESTED_ROW_SCHEMA = Schema.builder().addStringField("doubly_nested_str").addInt64Field("doubly_nested_float").build();
    private static final Schema NESTED_ROW_SCHEMA = Schema.builder().addStringField("nested_str").addInt32Field("nested_int").addFloatField("nested_float").addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA).build();
    private static final Schema BEAM_SCHEMA = Schema.builder().addStringField("str").addBooleanField("bool").addNullableInt32Field("nullable_int").addNullableInt64Field("nullable_long").addArrayField("arr_long", Schema.FieldType.INT64).addRowField("row", NESTED_ROW_SCHEMA).addNullableRowField("nullable_row", NESTED_ROW_SCHEMA).build();
    private static final SimpleFunction<Long, Row> ROW_FUNC = new SimpleFunction<Long, Row>() { // from class: org.apache.beam.sdk.io.iceberg.BigQueryMetastoreCatalogIT.1
        public Row apply(Long l) {
            String l2 = Long.toString(l.longValue());
            Row build = Row.withSchema(BigQueryMetastoreCatalogIT.NESTED_ROW_SCHEMA).addValue("nested_str_value_" + l2).addValue(Integer.valueOf(l2)).addValue(Float.valueOf(l2 + "." + l2)).addValue(Row.withSchema(BigQueryMetastoreCatalogIT.DOUBLY_NESTED_ROW_SCHEMA).addValue("doubly_nested_str_value_" + l2).addValue(l).build()).build();
            return Row.withSchema(BigQueryMetastoreCatalogIT.BEAM_SCHEMA).addValue("str_value_" + l2).addValue(Boolean.valueOf(l.longValue() % 2 == 0)).addValue(Integer.valueOf(l2)).addValue(l).addValue(LongStream.range(1L, l.longValue() % 10).boxed().collect(Collectors.toList())).addValue(build).addValue(l.longValue() % 2 == 0 ? null : build).build();
        }
    };
    private static final org.apache.iceberg.Schema ICEBERG_SCHEMA = IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA);
    private static final SimpleFunction<Row, Record> RECORD_FUNC = new SimpleFunction<Row, Record>() { // from class: org.apache.beam.sdk.io.iceberg.BigQueryMetastoreCatalogIT.2
        public Record apply(Row row) {
            return IcebergUtils.beamRowToIcebergRecord(BigQueryMetastoreCatalogIT.ICEBERG_SCHEMA, row);
        }
    };
    private static final String TEST_CATALOG = "beam_test_" + System.nanoTime();
    private static final String DATASET = "iceberg_bigquerymetastore_test_" + System.nanoTime();
    private static final String WAREHOUSE = TestPipeline.testingPipelineOptions().getTempLocation();

    @BeforeClass
    public static void setUp() {
        catalogProps = ImmutableMap.builder().put("gcp_project", TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject()).put("gcp_location", "us-central1").put("catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog").put("warehouse", WAREHOUSE).build();
        catalog = CatalogUtil.loadCatalog(catalogProps.get("catalog-impl"), TEST_CATALOG, catalogProps, new Configuration());
        catalog.initialize(TEST_CATALOG, catalogProps);
        catalog.createNamespace(Namespace.of(new String[]{DATASET}));
    }

    @After
    public void cleanup() {
        catalog.dropTable(this.tableIdentifier);
    }

    @AfterClass
    public static void tearDown() {
        catalog.dropNamespace(Namespace.of(new String[]{DATASET}));
    }

    private Map<String, Object> getManagedIcebergConfig(TableIdentifier tableIdentifier) {
        return ImmutableMap.builder().put("table", tableIdentifier.toString()).put("catalog_name", TEST_CATALOG).put("catalog_properties", catalogProps).build();
    }

    @Test
    public void testReadWithBqmsCatalog() throws IOException {
        this.tableIdentifier = TableIdentifier.parse(String.format("%s.%s", DATASET, this.testName.getMethodName()));
        Table createTable = catalog.createTable(this.tableIdentifier, ICEBERG_SCHEMA);
        Stream<Long> boxed = LongStream.range(1L, 1000L).boxed();
        SimpleFunction<Long, Row> simpleFunction = ROW_FUNC;
        Objects.requireNonNull(simpleFunction);
        List list = (List) boxed.map((v1) -> {
            return r1.apply(v1);
        }).collect(Collectors.toList());
        Stream stream = list.stream();
        SimpleFunction<Row, Record> simpleFunction2 = RECORD_FUNC;
        Objects.requireNonNull(simpleFunction2);
        List list2 = (List) stream.map((v1) -> {
            return r1.apply(v1);
        }).collect(Collectors.toList());
        String str = createTable.location() + "/" + UUID.randomUUID();
        DataWriter build = Parquet.writeData(createTable.io().newOutputFile(str)).schema(ICEBERG_SCHEMA).createWriterFunc(GenericParquetWriter::buildWriter).overwrite().withSpec(createTable.spec()).build();
        Iterator it = list2.iterator();
        while (it.hasNext()) {
            build.write((Record) it.next());
        }
        build.close();
        AppendFiles newAppend = createTable.newAppend();
        ManifestWriter write = ManifestFiles.write(createTable.spec(), createTable.io().newOutputFile(FileFormat.AVRO.addExtension(str + ".manifest")));
        Throwable th = null;
        try {
            try {
                write.add(build.toDataFile());
                if (write != null) {
                    if (0 != 0) {
                        try {
                            write.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        write.close();
                    }
                }
                newAppend.appendManifest(write.toManifestFile());
                newAppend.commit();
                PAssert.that(this.readPipeline.apply(Managed.read("iceberg").withConfig(getManagedIcebergConfig(this.tableIdentifier))).getSinglePCollection()).containsInAnyOrder(list);
                this.readPipeline.run().waitUntilFinish();
            } finally {
            }
        } catch (Throwable th3) {
            if (write != null) {
                if (th != null) {
                    try {
                        write.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    write.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testWriteWithBqmsCatalog() {
        this.tableIdentifier = TableIdentifier.parse(String.format("%s.%s", DATASET, this.testName.getMethodName()));
        catalog.createTable(this.tableIdentifier, IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA));
        LongStream range = LongStream.range(1L, 1000L);
        SimpleFunction<Long, Row> simpleFunction = ROW_FUNC;
        Objects.requireNonNull(simpleFunction);
        List list = (List) range.mapToObj((v1) -> {
            return r1.apply(v1);
        }).collect(Collectors.toList());
        Stream stream = list.stream();
        SimpleFunction<Row, Record> simpleFunction2 = RECORD_FUNC;
        Objects.requireNonNull(simpleFunction2);
        List list2 = (List) stream.map((v1) -> {
            return r1.apply(v1);
        }).collect(Collectors.toList());
        this.writePipeline.apply(Create.of(list)).setRowSchema(BEAM_SCHEMA).apply(Managed.write("iceberg").withConfig(getManagedIcebergConfig(this.tableIdentifier)));
        this.writePipeline.run().waitUntilFinish();
        Table loadTable = catalog.loadTable(this.tableIdentifier);
        TableScan tableScan = (TableScan) loadTable.newScan().project(ICEBERG_SCHEMA);
        ArrayList arrayList = new ArrayList();
        CloseableIterator it = tableScan.planTasks().iterator();
        while (it.hasNext()) {
            CombinedScanTask combinedScanTask = (CombinedScanTask) it.next();
            InputFilesDecryptor inputFilesDecryptor = new InputFilesDecryptor(combinedScanTask, loadTable.io(), loadTable.encryption());
            for (FileScanTask fileScanTask : combinedScanTask.files()) {
                CloseableIterator it2 = Parquet.read(inputFilesDecryptor.getInputFile(fileScanTask)).split(fileScanTask.start(), fileScanTask.length()).project(ICEBERG_SCHEMA).createReaderFunc(messageType -> {
                    return GenericParquetReaders.buildReader(ICEBERG_SCHEMA, messageType);
                }).filter(fileScanTask.residual()).build().iterator();
                while (it2.hasNext()) {
                    arrayList.add((Record) it2.next());
                }
            }
        }
        MatcherAssert.assertThat(list2, Matchers.containsInAnyOrder(arrayList.toArray()));
    }
}
