package org.apache.beam.sdk.io.iceberg;

import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PeriodicImpulse;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.RowFilter;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.encryption.InputFilesDecryptor;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.parquet.Parquet;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/iceberg/IcebergIOIT.class */
public class IcebergIOIT implements Serializable {
    private static final Schema DOUBLY_NESTED_ROW_SCHEMA = Schema.builder().addStringField("doubly_nested_str").addInt64Field("doubly_nested_float").build();
    private static final Schema NESTED_ROW_SCHEMA = Schema.builder().addStringField("nested_str").addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA).addInt32Field("nested_int").addFloatField("nested_float").build();
    private static final Schema BEAM_SCHEMA = Schema.builder().addStringField("str").addStringField("char").addInt64Field("modulo_5").addBooleanField("bool").addInt32Field("int").addRowField("row", NESTED_ROW_SCHEMA).addArrayField("arr_long", Schema.FieldType.INT64).addNullableRowField("nullable_row", NESTED_ROW_SCHEMA).addNullableInt64Field("nullable_long").build();
    private static final SimpleFunction<Long, Row> ROW_FUNC = new SimpleFunction<Long, Row>() { // from class: org.apache.beam.sdk.io.iceberg.IcebergIOIT.1
        public Row apply(Long l) {
            String l2 = Long.toString(l.longValue());
            Row build = Row.withSchema(IcebergIOIT.NESTED_ROW_SCHEMA).addValue("nested_str_value_" + l2).addValue(Row.withSchema(IcebergIOIT.DOUBLY_NESTED_ROW_SCHEMA).addValue("doubly_nested_str_value_" + l2).addValue(l).build()).addValue(Integer.valueOf(l2)).addValue(Float.valueOf(l2 + "." + l2)).build();
            return Row.withSchema(IcebergIOIT.BEAM_SCHEMA).addValue("value_" + l2).addValue(String.valueOf((char) (97 + (l.longValue() % 5)))).addValue(Long.valueOf(l.longValue() % 5)).addValue(Boolean.valueOf(l.longValue() % 2 == 0)).addValue(Integer.valueOf(l2)).addValue(build).addValue(LongStream.range(0L, l.longValue() % 10).boxed().collect(Collectors.toList())).addValue(l.longValue() % 2 == 0 ? null : build).addValue(l).build();
        }
    };
    private static final org.apache.iceberg.Schema ICEBERG_SCHEMA = IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA);
    private static final SimpleFunction<Row, Record> RECORD_FUNC = new SimpleFunction<Row, Record>() { // from class: org.apache.beam.sdk.io.iceberg.IcebergIOIT.2
        public Record apply(Row row) {
            return IcebergUtils.beamRowToIcebergRecord(IcebergIOIT.ICEBERG_SCHEMA, row);
        }
    };
    private static final Integer NUM_RECORDS = 1000;
    private static final Integer NUM_SHARDS = 10;
    static GcpOptions options;
    static Configuration catalogHadoopConf;
    private String warehouseLocation;
    private String tableId;
    private Catalog catalog;
    private static final List<Row> INPUT_ROWS;

    @Rule
    public TestPipeline pipeline = TestPipeline.create();

    @Rule
    public TestName testName = new TestName();

    @BeforeClass
    public static void beforeClass() {
        options = TestPipeline.testingPipelineOptions().as(GcpOptions.class);
        catalogHadoopConf = new Configuration();
        catalogHadoopConf.set("fs.gs.project.id", options.getProject());
        catalogHadoopConf.set("fs.gs.auth.type", "APPLICATION_DEFAULT");
    }

    @Before
    public void setUp() {
        this.warehouseLocation = String.format("%s/IcebergIOIT/%s", options.getTempLocation(), UUID.randomUUID());
        this.tableId = this.testName.getMethodName() + ".test_table";
        this.catalog = new HadoopCatalog(catalogHadoopConf, this.warehouseLocation);
    }

    private List<Row> populateTable(Table table) throws IOException {
        long round = Math.round(Math.ceil(NUM_RECORDS.doubleValue() / NUM_SHARDS.intValue()));
        AppendFiles newAppend = table.newAppend();
        ArrayList arrayList = new ArrayList(NUM_RECORDS.intValue());
        int i = 0;
        for (int i2 = 0; i2 < NUM_SHARDS.intValue(); i2++) {
            DataWriter build = Parquet.writeData(table.io().newOutputFile(table.location() + "/" + UUID.randomUUID())).schema(ICEBERG_SCHEMA).createWriterFunc(GenericParquetWriter::buildWriter).overwrite().withSpec(table.spec()).build();
            int i3 = 0;
            while (i3 < round && i < NUM_RECORDS.intValue()) {
                Row row = (Row) ROW_FUNC.apply(Long.valueOf(i3));
                build.write((Record) RECORD_FUNC.apply(row));
                arrayList.add(row);
                i3++;
                i++;
            }
            build.close();
            newAppend.appendFile(build.toDataFile());
        }
        newAppend.commit();
        return arrayList;
    }

    private List<Record> readRecords(Table table) {
        org.apache.iceberg.Schema schema = table.schema();
        TableScan tableScan = (TableScan) table.newScan().project(schema);
        ArrayList arrayList = new ArrayList();
        CloseableIterator it = tableScan.planTasks().iterator();
        while (it.hasNext()) {
            CombinedScanTask combinedScanTask = (CombinedScanTask) it.next();
            InputFilesDecryptor inputFilesDecryptor = new InputFilesDecryptor(combinedScanTask, table.io(), table.encryption());
            for (FileScanTask fileScanTask : combinedScanTask.files()) {
                CloseableIterator it2 = Parquet.read(inputFilesDecryptor.getInputFile(fileScanTask)).split(fileScanTask.start(), fileScanTask.length()).project(schema).createReaderFunc(messageType -> {
                    return GenericParquetReaders.buildReader(schema, messageType);
                }).filter(fileScanTask.residual()).build().iterator();
                while (it2.hasNext()) {
                    arrayList.add((Record) it2.next());
                }
            }
        }
        return arrayList;
    }

    private Map<String, Object> managedIcebergConfig(String str) {
        return ImmutableMap.builder().put("table", str).put("catalog_name", "test-name").put("catalog_properties", ImmutableMap.builder().put("type", "hadoop").put("warehouse", this.warehouseLocation).build()).build();
    }

    @Test
    public void testRead() throws Exception {
        PAssert.that(this.pipeline.apply(Managed.read("iceberg").withConfig(managedIcebergConfig(this.tableId))).getSinglePCollection()).containsInAnyOrder(populateTable(this.catalog.createTable(TableIdentifier.parse(this.tableId), ICEBERG_SCHEMA)));
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testWrite() {
        Table createTable = this.catalog.createTable(TableIdentifier.parse(this.tableId), ICEBERG_SCHEMA);
        this.pipeline.apply(Create.of(INPUT_ROWS)).setRowSchema(BEAM_SCHEMA).apply(Managed.write("iceberg").withConfig(managedIcebergConfig(this.tableId)));
        this.pipeline.run().waitUntilFinish();
        List<Record> readRecords = readRecords(createTable);
        Stream<Row> stream = INPUT_ROWS.stream();
        SimpleFunction<Row, Record> simpleFunction = RECORD_FUNC;
        Objects.requireNonNull(simpleFunction);
        MatcherAssert.assertThat(readRecords, Matchers.containsInAnyOrder(stream.map((v1) -> {
            return r2.apply(v1);
        }).toArray()));
    }

    @Test
    public void testWritePartitionedData() {
        Table createTable = this.catalog.createTable(TableIdentifier.parse(this.tableId), ICEBERG_SCHEMA, PartitionSpec.builderFor(ICEBERG_SCHEMA).identity("bool").identity("modulo_5").truncate("str", "value_x".length()).build());
        this.pipeline.apply(Create.of(INPUT_ROWS)).setRowSchema(BEAM_SCHEMA).apply(Managed.write("iceberg").withConfig(managedIcebergConfig(this.tableId)));
        this.pipeline.run().waitUntilFinish();
        List<Record> readRecords = readRecords(createTable);
        Stream<Row> stream = INPUT_ROWS.stream();
        SimpleFunction<Row, Record> simpleFunction = RECORD_FUNC;
        Objects.requireNonNull(simpleFunction);
        MatcherAssert.assertThat(readRecords, Matchers.containsInAnyOrder(stream.map((v1) -> {
            return r2.apply(v1);
        }).toArray()));
    }

    private PeriodicImpulse getStreamingSource() {
        return PeriodicImpulse.create().stopAfter(Duration.millis(NUM_RECORDS.intValue() - 1)).withInterval(Duration.millis(1L));
    }

    @Test
    public void testStreamingWrite() {
        Table createTable = this.catalog.createTable(TableIdentifier.parse(this.tableId), ICEBERG_SCHEMA, PartitionSpec.builderFor(ICEBERG_SCHEMA).identity("bool").identity("modulo_5").build());
        HashMap hashMap = new HashMap(managedIcebergConfig(this.tableId));
        hashMap.put("triggering_frequency_seconds", 4);
        PCollection rowSchema = this.pipeline.apply(getStreamingSource()).apply(MapElements.into(TypeDescriptors.rows()).via(instant -> {
            return (Row) ROW_FUNC.apply(Long.valueOf(instant.getMillis() % NUM_RECORDS.intValue()));
        })).setRowSchema(BEAM_SCHEMA);
        MatcherAssert.assertThat(rowSchema.isBounded(), Matchers.equalTo(PCollection.IsBounded.UNBOUNDED));
        rowSchema.apply(Managed.write("iceberg").withConfig(hashMap));
        this.pipeline.run().waitUntilFinish();
        List<Record> readRecords = readRecords(createTable);
        Stream<Row> stream = INPUT_ROWS.stream();
        SimpleFunction<Row, Record> simpleFunction = RECORD_FUNC;
        Objects.requireNonNull(simpleFunction);
        MatcherAssert.assertThat(readRecords, Matchers.containsInAnyOrder(stream.map((v1) -> {
            return r2.apply(v1);
        }).toArray()));
    }

    @Test
    public void testStreamingWriteWithPriorWindowing() {
        Table createTable = this.catalog.createTable(TableIdentifier.parse(this.tableId), ICEBERG_SCHEMA, PartitionSpec.builderFor(ICEBERG_SCHEMA).identity("bool").identity("modulo_5").build());
        HashMap hashMap = new HashMap(managedIcebergConfig(this.tableId));
        hashMap.put("triggering_frequency_seconds", 4);
        PCollection rowSchema = this.pipeline.apply(getStreamingSource()).apply(Window.into(FixedWindows.of(Duration.standardSeconds(1L))).accumulatingFiredPanes()).apply(MapElements.into(TypeDescriptors.rows()).via(instant -> {
            return (Row) ROW_FUNC.apply(Long.valueOf(instant.getMillis() % NUM_RECORDS.intValue()));
        })).setRowSchema(BEAM_SCHEMA);
        MatcherAssert.assertThat(rowSchema.isBounded(), Matchers.equalTo(PCollection.IsBounded.UNBOUNDED));
        rowSchema.apply(Managed.write("iceberg").withConfig(hashMap));
        this.pipeline.run().waitUntilFinish();
        List<Record> readRecords = readRecords(createTable);
        Stream<Row> stream = INPUT_ROWS.stream();
        SimpleFunction<Row, Record> simpleFunction = RECORD_FUNC;
        Objects.requireNonNull(simpleFunction);
        MatcherAssert.assertThat(readRecords, Matchers.containsInAnyOrder(stream.map((v1) -> {
            return r2.apply(v1);
        }).toArray()));
    }

    private void writeToDynamicDestinations(String str) {
        writeToDynamicDestinations(str, false, false);
    }

    private void writeToDynamicDestinations(String str, boolean z, boolean z2) {
        PCollection apply;
        HashMap hashMap = new HashMap(managedIcebergConfig(this.tableId + "_{modulo_5}_{char}"));
        List asList = Arrays.asList("row", "str", "int", "nullable_long");
        RowFilter rowFilter = new RowFilter(BEAM_SCHEMA);
        if (str != null) {
            boolean z3 = -1;
            switch (str.hashCode()) {
                case 3092207:
                    if (str.equals("drop")) {
                        z3 = false;
                        break;
                    }
                    break;
                case 3287941:
                    if (str.equals("keep")) {
                        z3 = true;
                        break;
                    }
                    break;
                case 3415980:
                    if (str.equals("only")) {
                        z3 = 2;
                        break;
                    }
                    break;
            }
            switch (z3) {
                case false:
                    rowFilter = rowFilter.drop(asList);
                    hashMap.put(str, asList);
                    break;
                case true:
                    rowFilter = rowFilter.keep(asList);
                    hashMap.put(str, asList);
                    break;
                case true:
                    rowFilter = rowFilter.only((String) asList.get(0));
                    hashMap.put(str, asList.get(0));
                    break;
                default:
                    throw new UnsupportedOperationException("Unknown operation: " + str);
            }
        }
        org.apache.iceberg.Schema beamSchemaToIcebergSchema = IcebergUtils.beamSchemaToIcebergSchema(rowFilter.outputSchema());
        PartitionSpec partitionSpec = null;
        if (z2) {
            Preconditions.checkState(str == null || !str.equals("only"));
            partitionSpec = PartitionSpec.builderFor(beamSchemaToIcebergSchema).identity("bool").identity("modulo_5").build();
        }
        Table createTable = this.catalog.createTable(TableIdentifier.parse(this.tableId + "_0_a"), beamSchemaToIcebergSchema, partitionSpec);
        Table createTable2 = this.catalog.createTable(TableIdentifier.parse(this.tableId + "_1_b"), beamSchemaToIcebergSchema, partitionSpec);
        Table createTable3 = this.catalog.createTable(TableIdentifier.parse(this.tableId + "_2_c"), beamSchemaToIcebergSchema, partitionSpec);
        Table createTable4 = this.catalog.createTable(TableIdentifier.parse(this.tableId + "_3_d"), beamSchemaToIcebergSchema, partitionSpec);
        Table createTable5 = this.catalog.createTable(TableIdentifier.parse(this.tableId + "_4_e"), beamSchemaToIcebergSchema, partitionSpec);
        if (z) {
            hashMap.put("triggering_frequency_seconds", 5);
            apply = (PCollection) this.pipeline.apply(getStreamingSource()).apply(MapElements.into(TypeDescriptors.rows()).via(instant -> {
                return (Row) ROW_FUNC.apply(Long.valueOf(instant.getMillis() % NUM_RECORDS.intValue()));
            }));
        } else {
            apply = this.pipeline.apply(Create.of(INPUT_ROWS));
        }
        apply.setRowSchema(BEAM_SCHEMA).apply(Managed.write("iceberg").withConfig(hashMap));
        this.pipeline.run().waitUntilFinish();
        List asList2 = Arrays.asList(readRecords(createTable), readRecords(createTable2), readRecords(createTable3), readRecords(createTable4), readRecords(createTable5));
        SerializableFunction serializableFunction = row -> {
            return IcebergUtils.beamRowToIcebergRecord(beamSchemaToIcebergSchema, row);
        };
        for (int i = 0; i < asList2.size(); i++) {
            List list = (List) asList2.get(i);
            long j = i;
            Stream<Row> filter = INPUT_ROWS.stream().filter(row2 -> {
                return ((Long) org.apache.beam.sdk.util.Preconditions.checkStateNotNull(row2.getInt64("modulo_5"))).longValue() == j;
            });
            RowFilter rowFilter2 = rowFilter;
            Objects.requireNonNull(rowFilter2);
            Stream<R> map = filter.map(rowFilter2::filter);
            Objects.requireNonNull(serializableFunction);
            MatcherAssert.assertThat(list, Matchers.containsInAnyOrder(map.map((v1) -> {
                return r1.apply(v1);
            }).toArray()));
        }
    }

    @Test
    public void testWriteToDynamicDestinations() {
        writeToDynamicDestinations(null);
    }

    @Test
    public void testWriteToDynamicDestinationsAndDropFields() {
        writeToDynamicDestinations("drop");
    }

    @Test
    public void testWriteToDynamicDestinationsAndKeepFields() {
        writeToDynamicDestinations("keep");
    }

    @Test
    public void testWriteToDynamicDestinationsWithOnlyRecord() {
        writeToDynamicDestinations("only");
    }

    @Test
    public void testStreamToDynamicDestinationsAndKeepFields() {
        writeToDynamicDestinations("keep", true, false);
    }

    @Test
    public void testStreamToPartitionedDynamicDestinations() {
        writeToDynamicDestinations(null, true, true);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1066368957:
                if (implMethodName.equals("lambda$writeToDynamicDestinations$710e5bda$1")) {
                    z = 2;
                    break;
                }
                break;
            case -743282270:
                if (implMethodName.equals("lambda$writeToDynamicDestinations$f6c30e47$1")) {
                    z = false;
                    break;
                }
                break;
            case 1139104799:
                if (implMethodName.equals("lambda$testStreamingWriteWithPriorWindowing$5a007597$1")) {
                    z = 3;
                    break;
                }
                break;
            case 1997462189:
                if (implMethodName.equals("lambda$testStreamingWrite$5a007597$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/iceberg/IcebergIOIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/iceberg/Schema;Lorg/apache/beam/sdk/values/Row;)Lorg/apache/iceberg/data/Record;")) {
                    org.apache.iceberg.Schema schema = (org.apache.iceberg.Schema) serializedLambda.getCapturedArg(0);
                    return row -> {
                        return IcebergUtils.beamRowToIcebergRecord(schema, row);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/iceberg/IcebergIOIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/joda/time/Instant;)Lorg/apache/beam/sdk/values/Row;")) {
                    return instant -> {
                        return (Row) ROW_FUNC.apply(Long.valueOf(instant.getMillis() % NUM_RECORDS.intValue()));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/iceberg/IcebergIOIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/joda/time/Instant;)Lorg/apache/beam/sdk/values/Row;")) {
                    return instant2 -> {
                        return (Row) ROW_FUNC.apply(Long.valueOf(instant2.getMillis() % NUM_RECORDS.intValue()));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/iceberg/IcebergIOIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/joda/time/Instant;)Lorg/apache/beam/sdk/values/Row;")) {
                    return instant3 -> {
                        return (Row) ROW_FUNC.apply(Long.valueOf(instant3.getMillis() % NUM_RECORDS.intValue()));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        Stream<Long> boxed = LongStream.range(0L, NUM_RECORDS.intValue()).boxed();
        SimpleFunction<Long, Row> simpleFunction = ROW_FUNC;
        Objects.requireNonNull(simpleFunction);
        INPUT_ROWS = (List) boxed.map((v1) -> {
            return r1.apply(v1);
        }).collect(Collectors.toList());
    }
}
