package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.HadoopCatalogExtension;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.sink.TestBucketPartitionerUtil;
import org.apache.iceberg.flink.source.BoundedTestSource;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

/* loaded from: input_file:org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.class */
public class TestBucketPartitionerFlinkIcebergSink {
    private static final int NUMBER_TASK_MANAGERS = 1;
    private static final int SLOTS_PER_TASK_MANAGER = 8;

    @RegisterExtension
    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(NUMBER_TASK_MANAGERS).setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER).setConfiguration(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG).build());

    @RegisterExtension
    private static final HadoopCatalogExtension catalogExtension = new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE);
    private static final TypeInformation<Row> ROW_TYPE_INFO = new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
    private final int parallelism = SLOTS_PER_TASK_MANAGER;
    private final FileFormat format = FileFormat.PARQUET;
    private final int numBuckets = 4;
    private Table table;
    private StreamExecutionEnvironment env;
    private TableLoader tableLoader;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink$TableTestStats.class */
    public static class TableTestStats {
        final int totalRowCount;
        final Map<Integer, List<Integer>> writersPerBucket;
        final Map<Integer, Integer> numFilesPerBucket;
        final Map<Integer, Long> rowsPerWriter;

        TableTestStats(int i, Map<Integer, List<Integer>> map, Map<Integer, Integer> map2, Map<Integer, Long> map3) {
            this.totalRowCount = i;
            this.writersPerBucket = map;
            this.numFilesPerBucket = map2;
            this.rowsPerWriter = map3;
        }
    }

    private void setupEnvironment(TestBucketPartitionerUtil.TableSchemaType tableSchemaType) {
        this.table = catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SimpleDataUtil.SCHEMA, tableSchemaType.getPartitionSpec(4), ImmutableMap.of("write.format.default", this.format.name()));
        this.env = StreamExecutionEnvironment.getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG).enableCheckpointing(100L).setParallelism(SLOTS_PER_TASK_MANAGER).setMaxParallelism(16);
        this.tableLoader = catalogExtension.tableLoader();
    }

    private void appendRowsToTable(List<RowData> list) throws Exception {
        DataFormatConverters.RowConverter rowConverter = new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
        StreamExecutionEnvironment streamExecutionEnvironment = this.env;
        Stream<RowData> stream = list.stream();
        Objects.requireNonNull(rowConverter);
        DataStreamSource addSource = streamExecutionEnvironment.addSource(new BoundedTestSource((Row[]) stream.map((v1) -> {
            return r4.toExternal(v1);
        }).toArray(i -> {
            return new Row[i];
        })), ROW_TYPE_INFO);
        Objects.requireNonNull(rowConverter);
        FlinkSink.forRowData(addSource.map((v1) -> {
            return r1.toInternal(v1);
        }, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE))).table(this.table).tableLoader(this.tableLoader).writeParallelism(SLOTS_PER_TASK_MANAGER).distributionMode(DistributionMode.HASH).append();
        this.env.execute("Test Iceberg DataStream");
        SimpleDataUtil.assertTableRows(this.table, list);
    }

    @EnumSource(value = TestBucketPartitionerUtil.TableSchemaType.class, names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"})
    @ParameterizedTest
    public void testSendRecordsToAllBucketsEvenly(TestBucketPartitionerUtil.TableSchemaType tableSchemaType) throws Exception {
        setupEnvironment(tableSchemaType);
        List<RowData> generateTestDataRows = generateTestDataRows();
        appendRowsToTable(generateTestDataRows);
        TableTestStats extractPartitionResults = extractPartitionResults(tableSchemaType);
        Assertions.assertThat(extractPartitionResults.totalRowCount).isEqualTo(generateTestDataRows.size());
        Assertions.assertThat(extractPartitionResults.writersPerBucket.size()).isEqualTo(4);
        Assertions.assertThat(extractPartitionResults.numFilesPerBucket.size()).isEqualTo(4);
        int i = 0;
        int i2 = 4;
        while (i < 4) {
            Assertions.assertThat(extractPartitionResults.writersPerBucket.get(Integer.valueOf(i))).hasSameElementsAs(Arrays.asList(Integer.valueOf(i), Integer.valueOf(i2)));
            Assertions.assertThat(extractPartitionResults.numFilesPerBucket.get(Integer.valueOf(i))).isEqualTo(2);
            Assertions.assertThat(extractPartitionResults.rowsPerWriter.get(Integer.valueOf(i))).isEqualTo(2L);
            i += NUMBER_TASK_MANAGERS;
            i2 += NUMBER_TASK_MANAGERS;
        }
    }

    @EnumSource(value = TestBucketPartitionerUtil.TableSchemaType.class, names = {"TWO_BUCKETS"})
    @ParameterizedTest
    public void testMultipleBucketsFallback(TestBucketPartitionerUtil.TableSchemaType tableSchemaType) throws Exception {
        setupEnvironment(tableSchemaType);
        List<RowData> generateTestDataRows = generateTestDataRows();
        appendRowsToTable(generateTestDataRows);
        TableTestStats extractPartitionResults = extractPartitionResults(tableSchemaType);
        Assertions.assertThat(extractPartitionResults.totalRowCount).isEqualTo(generateTestDataRows.size());
        int i = 0;
        int i2 = 4;
        while (i < 4) {
            Assertions.assertThat(extractPartitionResults.numFilesPerBucket.get(Integer.valueOf(i)).intValue()).isEqualTo(NUMBER_TASK_MANAGERS);
            i += NUMBER_TASK_MANAGERS;
            i2 += NUMBER_TASK_MANAGERS;
        }
    }

    private List<RowData> generateTestDataRows() {
        return TestBucketPartitionerUtil.generateRowsForBucketIdRange(16 / 4, 4);
    }

    private TableTestStats extractPartitionResults(TestBucketPartitionerUtil.TableSchemaType tableSchemaType) throws IOException {
        int i = 0;
        HashMap newHashMap = Maps.newHashMap();
        HashMap newHashMap2 = Maps.newHashMap();
        HashMap newHashMap3 = Maps.newHashMap();
        CloseableIterable planFiles = this.table.newScan().planFiles();
        Throwable th = null;
        try {
            CloseableIterator it = planFiles.iterator();
            while (it.hasNext()) {
                FileScanTask fileScanTask = (FileScanTask) it.next();
                long recordCount = fileScanTask.file().recordCount();
                String[] split = fileScanTask.file().path().toString().split("/");
                int parseInt = Integer.parseInt(split[split.length - NUMBER_TASK_MANAGERS].split("-")[0]);
                i = (int) (i + recordCount);
                int intValue = ((Integer) fileScanTask.file().partition().get(tableSchemaType.bucketPartitionColumnPosition(), Integer.class)).intValue();
                newHashMap.computeIfAbsent(Integer.valueOf(intValue), num -> {
                    return Lists.newArrayList();
                });
                ((List) newHashMap.get(Integer.valueOf(intValue))).add(Integer.valueOf(parseInt));
                newHashMap2.put(Integer.valueOf(intValue), Integer.valueOf(((Integer) newHashMap2.getOrDefault(Integer.valueOf(intValue), 0)).intValue() + NUMBER_TASK_MANAGERS));
                newHashMap3.put(Integer.valueOf(parseInt), Long.valueOf(((Long) newHashMap3.getOrDefault(Integer.valueOf(parseInt), 0L)).longValue() + recordCount));
            }
            return new TableTestStats(i, newHashMap, newHashMap2, newHashMap3);
        } finally {
            if (planFiles != null) {
                if (0 != 0) {
                    try {
                        planFiles.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    planFiles.close();
                }
            }
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 550342232:
                if (implMethodName.equals("toInternal")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/table/data/util/DataFormatConverters$DataFormatConverter") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    DataFormatConverters.RowConverter rowConverter = (DataFormatConverters.RowConverter) serializedLambda.getCapturedArg(0);
                    return (v1) -> {
                        return r0.toInternal(v1);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
