package org.apache.hudi.utilities.streamer;

import java.io.IOException;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.streamer.ErrorEvent;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.class */
public class TestStreamSyncUnitTests {
    @MethodSource({"testCasesFetchNextBatchFromSource"})
    @ParameterizedTest
    void testFetchNextBatchFromSource(Boolean bool, Boolean bool2, Boolean bool3, Boolean bool4, Boolean bool5, Boolean bool6) {
        HoodieSparkEngineContext hoodieSparkEngineContext = (HoodieSparkEngineContext) Mockito.mock(HoodieSparkEngineContext.class);
        HoodieHadoopStorage hoodieHadoopStorage = new HoodieHadoopStorage((FileSystem) Mockito.mock(FileSystem.class));
        SparkSession sparkSession = (SparkSession) Mockito.mock(SparkSession.class);
        Configuration configuration = (Configuration) Mockito.mock(Configuration.class);
        HoodieStreamer.Config config = new HoodieStreamer.Config();
        config.targetTableName = "testTableName";
        config.targetBasePath = "/fake/table/name";
        config.tableType = "MERGE_ON_READ";
        SourceFormatAdapter sourceFormatAdapter = (SourceFormatAdapter) Mockito.mock(SourceFormatAdapter.class);
        SchemaProvider schemaProvider = getSchemaProvider("InputBatch", false);
        Mockito.when(sourceFormatAdapter.fetchNewDataInRowFormat((Option) ArgumentMatchers.any(), ArgumentMatchers.anyLong())).thenReturn(new InputBatch(Option.of(Mockito.mock(Dataset.class)), "chkpt", schemaProvider));
        Mockito.when(sourceFormatAdapter.fetchNewDataInAvroFormat((Option) ArgumentMatchers.any(), ArgumentMatchers.anyLong())).thenReturn(new InputBatch(Option.empty(), "chkpt", schemaProvider));
        Mockito.when(sourceFormatAdapter.processErrorEvents((Option) ArgumentMatchers.any(), (ErrorEvent.ErrorReason) ArgumentMatchers.any())).thenReturn(Option.empty());
        Option empty = Option.empty();
        if (bool2.booleanValue()) {
            empty = Option.of(Mockito.mock(Transformer.class));
        }
        SchemaProvider schemaProvider2 = null;
        if (bool3.booleanValue()) {
            schemaProvider2 = getSchemaProvider("UserProvided", bool4.booleanValue());
        }
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.put(DataSourceWriteOptions.RECONCILE_SCHEMA().key(), false);
        Option empty2 = Option.empty();
        if (bool5.booleanValue()) {
            empty2 = Option.of(Mockito.mock(BaseErrorTableWriter.class));
            typedProperties.put(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(), true);
        }
        TypedProperties typedProperties2 = (TypedProperties) Mockito.spy(typedProperties);
        StreamSync streamSync = (StreamSync) Mockito.spy(new StreamSync(config, sparkSession, typedProperties2, hoodieSparkEngineContext, hoodieHadoopStorage, configuration, sparkRDDWriteClient -> {
            return true;
        }, schemaProvider2, empty2, sourceFormatAdapter, empty, bool.booleanValue(), false));
        SchemaProvider schemaProvider3 = getSchemaProvider("deduced", false);
        ((StreamSync) Mockito.doReturn(schemaProvider3).when(streamSync)).getDeducedSchemaProvider((Schema) ArgumentMatchers.any(), (SchemaProvider) ArgumentMatchers.any(), (HoodieTableMetaClient) ArgumentMatchers.any());
        InputBatch fetchNextBatchFromSource = streamSync.fetchNextBatchFromSource(Option.empty(), (HoodieTableMetaClient) Mockito.mock(HoodieTableMetaClient.class));
        ((StreamSync) Mockito.verify(streamSync, Mockito.times(1))).getDeducedSchemaProvider((Schema) ArgumentMatchers.any(), (SchemaProvider) ArgumentMatchers.any(), (HoodieTableMetaClient) ArgumentMatchers.any());
        Assertions.assertEquals(schemaProvider3.getTargetSchema(), fetchNextBatchFromSource.getSchemaProvider().getTargetSchema());
        ((TypedProperties) Mockito.verify(typedProperties2, bool6.booleanValue() ? Mockito.times(1) : Mockito.never())).getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(), ((Boolean) HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue()).booleanValue());
    }

    @MethodSource({"getCheckpointToResumeCases"})
    @ParameterizedTest
    void testGetCheckpointToResume(HoodieStreamer.Config config, HoodieCommitMetadata hoodieCommitMetadata, Option<String> option) throws IOException {
        HoodieSparkEngineContext hoodieSparkEngineContext = (HoodieSparkEngineContext) Mockito.mock(HoodieSparkEngineContext.class);
        HoodieHadoopStorage hoodieHadoopStorage = new HoodieHadoopStorage((FileSystem) Mockito.mock(FileSystem.class));
        TypedProperties typedProperties = new TypedProperties();
        SparkSession sparkSession = (SparkSession) Mockito.mock(SparkSession.class);
        Configuration configuration = (Configuration) Mockito.mock(Configuration.class);
        HoodieTimeline hoodieTimeline = (HoodieTimeline) Mockito.mock(HoodieTimeline.class);
        HoodieInstant hoodieInstant = (HoodieInstant) Mockito.mock(HoodieInstant.class);
        Mockito.when(hoodieTimeline.filter((Predicate) ArgumentMatchers.any())).thenReturn(hoodieTimeline);
        Mockito.when(hoodieTimeline.lastInstant()).thenReturn(Option.of(hoodieInstant));
        StreamSync streamSync = (StreamSync) Mockito.spy(new StreamSync(config, sparkSession, typedProperties, hoodieSparkEngineContext, hoodieHadoopStorage, configuration, sparkRDDWriteClient -> {
            return true;
        }, (SchemaProvider) null, Option.empty(), (SourceFormatAdapter) null, Option.empty(), true, true));
        ((StreamSync) Mockito.doReturn(Option.of(hoodieCommitMetadata)).when(streamSync)).getLatestCommitMetadataWithValidCheckpointInfo((HoodieTimeline) ArgumentMatchers.any());
        Assertions.assertEquals(option, streamSync.getCheckpointToResume(Option.of(hoodieTimeline)));
    }

    private static Stream<Arguments> getCheckpointToResumeCases() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{generateDeltaStreamerConfig("new-reset-checkpoint", null), generateCommitMetadata("old-reset-checkpoint", null, null), Option.of("new-reset-checkpoint")}), Arguments.of(new Object[]{generateDeltaStreamerConfig("old-reset-checkpoint", null), generateCommitMetadata("old-reset-checkpoint", null, "checkpoint-prev-run"), Option.of("checkpoint-prev-run")}), Arguments.of(new Object[]{generateDeltaStreamerConfig("old-reset-checkpoint", "123445"), generateCommitMetadata("old-reset-checkpoint", "123445", "checkpoint-prev-run"), Option.of("checkpoint-prev-run")}), Arguments.of(new Object[]{generateDeltaStreamerConfig("old-reset-checkpoint", "123445"), generateCommitMetadata("old-reset-checkpoint", "123422", "checkpoint-prev-run"), Option.empty()}), Arguments.of(new Object[]{generateDeltaStreamerConfig("new-reset-checkpoint", "123445"), generateCommitMetadata("old-reset-checkpoint", "123422", "checkpoint-prev-run"), Option.empty()})});
    }

    private static HoodieStreamer.Config generateDeltaStreamerConfig(String str, String str2) {
        HoodieStreamer.Config config = new HoodieStreamer.Config();
        config.checkpoint = str;
        config.ignoreCheckpoint = str2;
        config.tableType = "MERGE_ON_READ";
        return config;
    }

    private static HoodieCommitMetadata generateCommitMetadata(String str, String str2, String str3) {
        HoodieCommitMetadata hoodieCommitMetadata = new HoodieCommitMetadata();
        hoodieCommitMetadata.addMetadata("deltastreamer.checkpoint.reset_key", str);
        hoodieCommitMetadata.addMetadata("deltastreamer.checkpoint.ignore_key", str2);
        hoodieCommitMetadata.addMetadata("deltastreamer.checkpoint.key", str3);
        return hoodieCommitMetadata;
    }

    private SchemaProvider getSchemaProvider(String str, boolean z) {
        SchemaProvider schemaProvider = (SchemaProvider) Mockito.mock(SchemaProvider.class);
        Schema schema = (Schema) Mockito.mock(Schema.class);
        Schema schema2 = z ? InputBatch.NULL_SCHEMA : (Schema) Mockito.mock(Schema.class);
        Mockito.when(schemaProvider.getSourceSchema()).thenReturn(schema);
        Mockito.when(schemaProvider.getTargetSchema()).thenReturn(schema2);
        Mockito.when(schema.toString()).thenReturn(str + "SourceSchema");
        if (!z) {
            Mockito.when(schema2.toString()).thenReturn(str + "TargetSchema");
        }
        return schemaProvider;
    }

    static Stream<Arguments> testCasesFetchNextBatchFromSource() {
        Stream.Builder builder = Stream.builder();
        for (Boolean bool : new Boolean[]{false, true}) {
            for (Boolean bool2 : new Boolean[]{false, true}) {
                builder.add(Arguments.of(new Object[]{bool, false, false, false, bool2, Boolean.valueOf(bool2.booleanValue() && !bool.booleanValue())}));
            }
        }
        for (Boolean bool3 : new Boolean[]{false, true}) {
            for (Boolean bool4 : new Boolean[]{false, true}) {
                for (Boolean bool5 : new Boolean[]{false, true}) {
                    for (Boolean bool6 : new Boolean[]{false, true}) {
                        builder.add(Arguments.of(new Object[]{bool3, true, bool4, bool5, bool6, Boolean.valueOf((bool6.booleanValue() && !bool3.booleanValue()) && !(bool5.booleanValue() || !bool4.booleanValue()))}));
                    }
                }
            }
        }
        return builder.build();
    }
}
