package org.apache.hudi.io;

import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.AWSDmsAvroPayload;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/hudi/io/TestHoodieMergedReadHandle.class */
public class TestHoodieMergedReadHandle extends SparkClientFunctionalTestHarness {
    private static Stream<Arguments> avroPayloadClasses() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{HoodieTableType.COPY_ON_WRITE, OverwriteWithLatestAvroPayload.class}), Arguments.of(new Object[]{HoodieTableType.COPY_ON_WRITE, OverwriteNonDefaultsWithLatestAvroPayload.class}), Arguments.of(new Object[]{HoodieTableType.COPY_ON_WRITE, PartialUpdateAvroPayload.class}), Arguments.of(new Object[]{HoodieTableType.COPY_ON_WRITE, DefaultHoodieRecordPayload.class}), Arguments.of(new Object[]{HoodieTableType.COPY_ON_WRITE, AWSDmsAvroPayload.class}), Arguments.of(new Object[]{HoodieTableType.COPY_ON_WRITE, MySqlDebeziumAvroPayload.class}), Arguments.of(new Object[]{HoodieTableType.COPY_ON_WRITE, PostgresDebeziumAvroPayload.class}), Arguments.of(new Object[]{HoodieTableType.MERGE_ON_READ, OverwriteWithLatestAvroPayload.class}), Arguments.of(new Object[]{HoodieTableType.MERGE_ON_READ, OverwriteNonDefaultsWithLatestAvroPayload.class}), Arguments.of(new Object[]{HoodieTableType.MERGE_ON_READ, PartialUpdateAvroPayload.class}), Arguments.of(new Object[]{HoodieTableType.MERGE_ON_READ, DefaultHoodieRecordPayload.class}), Arguments.of(new Object[]{HoodieTableType.MERGE_ON_READ, AWSDmsAvroPayload.class}), Arguments.of(new Object[]{HoodieTableType.MERGE_ON_READ, MySqlDebeziumAvroPayload.class}), Arguments.of(new Object[]{HoodieTableType.MERGE_ON_READ, PostgresDebeziumAvroPayload.class})});
    }

    @MethodSource({"avroPayloadClasses"})
    @ParameterizedTest
    public void testReadLatestRecordsWithDeletes(HoodieTableType hoodieTableType, Class<?> cls) throws IOException {
        HoodieWriteConfig writeConfig = getWriteConfig(cls);
        HoodieTableMetaClient hoodieMetaClient = getHoodieMetaClient(hoodieTableType, (Properties) writeConfig.getProps());
        SparkRDDWriteClient hoodieWriteClient = m58getHoodieWriteClient(writeConfig);
        Throwable th = null;
        try {
            String commitTimeAtUTC = HoodieTestDataGenerator.getCommitTimeAtUTC(0L);
            List inserts = HoodieAdaptablePayloadDataGenerator.getInserts(4, "foo", 0L, cls);
            hoodieWriteClient.startCommitWithTime(commitTimeAtUTC);
            Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(inserts, 1), commitTimeAtUTC).collect());
            doMergedReadAndValidate(hoodieMetaClient, writeConfig, 4, "foo", 0L, cls);
            String commitTimeAtUTC2 = HoodieTestDataGenerator.getCommitTimeAtUTC(5L);
            List updates = HoodieAdaptablePayloadDataGenerator.getUpdates(inserts, 5L, cls);
            hoodieWriteClient.startCommitWithTime(commitTimeAtUTC2);
            Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(updates, 1), commitTimeAtUTC2).collect());
            doMergedReadAndValidate(hoodieMetaClient, writeConfig, 4, "foo", 5L, cls);
            String commitTimeAtUTC3 = HoodieTestDataGenerator.getCommitTimeAtUTC(6L);
            hoodieWriteClient.startCommitWithTime(commitTimeAtUTC3);
            Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(HoodieAdaptablePayloadDataGenerator.getDeletes(updates.subList(3, 4), 6L, cls), 1), commitTimeAtUTC3).collect());
            doMergedReadAndValidate(hoodieMetaClient, writeConfig, 3, "foo", 5L, cls);
            String commitTimeAtUTC4 = HoodieTestDataGenerator.getCommitTimeAtUTC(7L);
            hoodieWriteClient.startCommitWithTime(commitTimeAtUTC4);
            Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(HoodieAdaptablePayloadDataGenerator.getDeletesWithEmptyPayload(updates.subList(2, 3)), 1), commitTimeAtUTC4).collect());
            doMergedReadAndValidate(hoodieMetaClient, writeConfig, 2, "foo", 5L, cls);
            String commitTimeAtUTC5 = HoodieTestDataGenerator.getCommitTimeAtUTC(9L);
            List updates2 = HoodieAdaptablePayloadDataGenerator.getUpdates(updates, 9L, cls);
            hoodieWriteClient.startCommitWithTime(commitTimeAtUTC5);
            Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(updates2, 1), commitTimeAtUTC5).collect());
            doMergedReadAndValidate(hoodieMetaClient, writeConfig, 4, "foo", 9L, cls);
            if (hoodieWriteClient != null) {
                if (0 == 0) {
                    hoodieWriteClient.close();
                    return;
                }
                try {
                    hoodieWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th3;
        }
    }

    private static Stream<Arguments> avroPayloadClassesThatHonorOrdering() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{HoodieTableType.COPY_ON_WRITE, PartialUpdateAvroPayload.class}), Arguments.of(new Object[]{HoodieTableType.COPY_ON_WRITE, DefaultHoodieRecordPayload.class}), Arguments.of(new Object[]{HoodieTableType.COPY_ON_WRITE, MySqlDebeziumAvroPayload.class}), Arguments.of(new Object[]{HoodieTableType.COPY_ON_WRITE, PostgresDebeziumAvroPayload.class}), Arguments.of(new Object[]{HoodieTableType.MERGE_ON_READ, PartialUpdateAvroPayload.class}), Arguments.of(new Object[]{HoodieTableType.MERGE_ON_READ, DefaultHoodieRecordPayload.class}), Arguments.of(new Object[]{HoodieTableType.MERGE_ON_READ, MySqlDebeziumAvroPayload.class}), Arguments.of(new Object[]{HoodieTableType.MERGE_ON_READ, PostgresDebeziumAvroPayload.class})});
    }

    @MethodSource({"avroPayloadClassesThatHonorOrdering"})
    @ParameterizedTest
    public void testReadLatestRecordsWithLateArrivedRecords(HoodieTableType hoodieTableType, Class<?> cls) throws IOException {
        HoodieWriteConfig writeConfig = getWriteConfig(cls);
        HoodieTableMetaClient hoodieMetaClient = getHoodieMetaClient(hoodieTableType, (Properties) writeConfig.getProps());
        SparkRDDWriteClient hoodieWriteClient = m58getHoodieWriteClient(writeConfig);
        Throwable th = null;
        try {
            try {
                String commitTimeAtUTC = HoodieTestDataGenerator.getCommitTimeAtUTC(0L);
                List inserts = HoodieAdaptablePayloadDataGenerator.getInserts(4, "foo", 0L, cls);
                hoodieWriteClient.startCommitWithTime(commitTimeAtUTC);
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(inserts, 1), commitTimeAtUTC).collect());
                doMergedReadAndValidate(hoodieMetaClient, writeConfig, 4, "foo", 0L, cls);
                String commitTimeAtUTC2 = HoodieTestDataGenerator.getCommitTimeAtUTC(5L);
                List updates = HoodieAdaptablePayloadDataGenerator.getUpdates(inserts, 5L, cls);
                hoodieWriteClient.startCommitWithTime(commitTimeAtUTC2);
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(updates, 1), commitTimeAtUTC2).collect());
                doMergedReadAndValidate(hoodieMetaClient, writeConfig, 4, "foo", 5L, cls);
                String commitTimeAtUTC3 = HoodieTestDataGenerator.getCommitTimeAtUTC(6L);
                List updates2 = HoodieAdaptablePayloadDataGenerator.getUpdates(inserts, 1L, cls);
                hoodieWriteClient.startCommitWithTime(commitTimeAtUTC3);
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(updates2, 1), commitTimeAtUTC3).collect());
                doMergedReadAndValidate(hoodieMetaClient, writeConfig, 4, "foo", 5L, cls);
                String commitTimeAtUTC4 = HoodieTestDataGenerator.getCommitTimeAtUTC(9L);
                List updates3 = HoodieAdaptablePayloadDataGenerator.getUpdates(updates, 9L, cls);
                hoodieWriteClient.startCommitWithTime(commitTimeAtUTC4);
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(updates3, 1), commitTimeAtUTC4).collect());
                doMergedReadAndValidate(hoodieMetaClient, writeConfig, 4, "foo", 9L, cls);
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    private void doMergedReadAndValidate(HoodieTableMetaClient hoodieTableMetaClient, HoodieWriteConfig hoodieWriteConfig, int i, String str, long j, Class<?> cls) throws IOException {
        String orderingField = new HoodieAdaptablePayloadDataGenerator.RecordGen(cls).getOrderingField();
        HoodieSparkTable create = HoodieSparkTable.create(hoodieWriteConfig, m57context(), HoodieTableMetaClient.reload(hoodieTableMetaClient));
        List list = (List) create.getHoodieView().getLatestBaseFiles(str).map(hoodieBaseFile -> {
            return Pair.of(str, hoodieBaseFile.getFileId());
        }).collect(Collectors.toList());
        org.junit.jupiter.api.Assertions.assertEquals(1, list.size());
        List mergedRecords = new HoodieMergedReadHandle(hoodieWriteConfig, Option.of(((HoodieInstant) create.getActiveTimeline().lastInstant().get()).getTimestamp()), create, (Pair) list.get(0)).getMergedRecords();
        org.junit.jupiter.api.Assertions.assertEquals(i, mergedRecords.size());
        List list2 = (List) mergedRecords.stream().sorted(Comparator.comparing((v0) -> {
            return v0.getRecordKey();
        })).collect(Collectors.toList());
        for (int i2 = 0; i2 < list2.size(); i2++) {
            HoodieRecord hoodieRecord = (HoodieRecord) list2.get(i2);
            org.junit.jupiter.api.Assertions.assertEquals(i2, Integer.parseInt(hoodieRecord.getRecordKey()));
            org.junit.jupiter.api.Assertions.assertEquals(str, hoodieRecord.getPartitionPath());
            org.junit.jupiter.api.Assertions.assertEquals(cls.getName(), hoodieRecord.getData().getClass().getName());
            Option insertValue = ((HoodieRecordPayload) hoodieRecord.getData()).getInsertValue(HoodieAdaptablePayloadDataGenerator.SCHEMA_WITH_METAFIELDS);
            org.junit.jupiter.api.Assertions.assertTrue(insertValue.isPresent());
            GenericRecord genericRecord = (GenericRecord) insertValue.get();
            org.junit.jupiter.api.Assertions.assertEquals(i2, Integer.parseInt(genericRecord.get("id").toString()));
            org.junit.jupiter.api.Assertions.assertEquals(str, genericRecord.get("pt").toString());
            org.junit.jupiter.api.Assertions.assertEquals(j, Long.parseLong(genericRecord.get(orderingField).toString()));
        }
    }

    private HoodieWriteConfig getWriteConfig(Class<?> cls) {
        return getConfigBuilder(true).withProperties(HoodieAdaptablePayloadDataGenerator.getKeyGenProps(cls)).withParallelism(2, 2).withBulkInsertParallelism(2).withDeleteParallelism(1).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).withSchema(HoodieAdaptablePayloadDataGenerator.SCHEMA_STR).withPayloadConfig(HoodiePayloadConfig.newBuilder().fromProperties(HoodieAdaptablePayloadDataGenerator.getPayloadProps(cls)).build()).build();
    }
}
