package org.apache.hudi.utilities.sources;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.testutils.JdbcTestUtils;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.storage.StorageLevel;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/hudi/utilities/sources/TestJdbcSource.class */
public class TestJdbcSource extends UtilitiesTestBase {
    private static final TypedProperties PROPS = new TypedProperties();
    private static final HoodieTestDataGenerator DATA_GENERATOR = new HoodieTestDataGenerator();
    private static Connection connection;

    @Override // org.apache.hudi.utilities.testutils.UtilitiesTestBase
    @BeforeEach
    public void setup() throws Exception {
        super.setup();
        PROPS.setProperty("hoodie.deltastreamer.jdbc.url", "jdbc:h2:mem:test_mem");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.driver.class", "org.h2.Driver");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.user", "test");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.password", "jdbc");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.table.name", "triprec");
        connection = DriverManager.getConnection("jdbc:h2:mem:test_mem", "test", "jdbc");
    }

    @Override // org.apache.hudi.utilities.testutils.UtilitiesTestBase
    @AfterEach
    public void teardown() throws Exception {
        super.teardown();
        JdbcTestUtils.close(connection);
    }

    @Test
    public void testSingleCommit() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert");
        try {
            JdbcTestUtils.clearAndInsert("000", 100, connection, DATA_GENERATOR, PROPS);
            Assertions.assertEquals(100, JdbcTestUtils.count(connection, "triprec"));
            Assertions.assertEquals(100, ((Dataset) runSource(Option.empty(), 100).getBatch().get()).count());
        } catch (SQLException e) {
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    public void testInsertAndUpdate() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert");
        try {
            JdbcTestUtils.update("007", (List) JdbcTestUtils.clearAndInsert("000", 100, connection, DATA_GENERATOR, PROPS).stream().limit(50L).collect(Collectors.toList()), connection, DATA_GENERATOR, PROPS);
            Assertions.assertEquals(100, JdbcTestUtils.count(connection, "triprec"));
            Dataset dataset = (Dataset) runSource(Option.empty(), 100L).getBatch().get();
            Assertions.assertEquals(100L, dataset.count());
            Assertions.assertEquals(50L, dataset.where("commit_time=000").count());
            Assertions.assertEquals(50L, dataset.where("commit_time=007").count());
        } catch (Exception e) {
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    public void testTwoCommits() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert");
        try {
            JdbcTestUtils.clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
            Assertions.assertEquals(10L, ((Dataset) runSource(Option.empty(), 10L).getBatch().get()).where("commit_time=000").count());
            JdbcTestUtils.insert("001", 5, connection, DATA_GENERATOR, PROPS);
            Dataset dataset = (Dataset) runSource(Option.empty(), 15L).getBatch().get();
            Assertions.assertEquals(15L, dataset.count());
            Assertions.assertEquals(5L, dataset.where("commit_time=001").count());
            Assertions.assertEquals(10L, dataset.where("commit_time=000").count());
            Assertions.assertEquals(15L, ((Dataset) runSource(Option.empty(), 15L).getBatch().get()).count());
        } catch (Exception e) {
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    public void testIncrementalFetchWithCommitTime() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert");
        try {
            JdbcTestUtils.clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
            InputBatch<Dataset<Row>> runSource = runSource(Option.empty(), 10L);
            Assertions.assertEquals(10L, ((Dataset) runSource.getBatch().get()).count());
            JdbcTestUtils.insert("001", 10, connection, DATA_GENERATOR, PROPS);
            Dataset dataset = (Dataset) runSource(Option.of(runSource.getCheckpointForNextBatch()), 10L).getBatch().get();
            Assertions.assertEquals(10L, dataset.count());
            Assertions.assertEquals(10L, dataset.where("commit_time=001").count());
        } catch (Exception e) {
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    public void testIncrementalFetchWithNoMatchingRows() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert");
        try {
            JdbcTestUtils.clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
            InputBatch<Dataset<Row>> runSource = runSource(Option.empty(), 10L);
            Assertions.assertEquals(10L, ((Dataset) runSource.getBatch().get()).count());
            Assertions.assertEquals(0L, ((Dataset) runSource(Option.of(runSource.getCheckpointForNextBatch()), 10L).getBatch().get()).count());
        } catch (Exception e) {
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    public void testIncrementalFetchWhenTableRecordsMoreThanSourceLimit() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "id");
        try {
            JdbcTestUtils.clearAndInsert("000", 100, connection, DATA_GENERATOR, PROPS);
            InputBatch<Dataset<Row>> runSource = runSource(Option.empty(), 100L);
            Assertions.assertEquals(100L, ((Dataset) runSource.getBatch().get()).count());
            JdbcTestUtils.insert("001", 100, connection, DATA_GENERATOR, PROPS);
            InputBatch<Dataset<Row>> runSource2 = runSource(Option.of(runSource.getCheckpointForNextBatch()), 60L);
            Dataset dataset = (Dataset) runSource2.getBatch().get();
            Assertions.assertEquals(60L, dataset.count());
            Assertions.assertEquals(60L, dataset.where("commit_time=001").count());
            Assertions.assertEquals(40L, ((Dataset) runSource(Option.of(runSource2.getCheckpointForNextBatch()), 75L).getBatch().get()).count());
        } catch (Exception e) {
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    public void testIncrementalFetchWhenLastCheckpointMoreThanTableRecords() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "id");
        try {
            JdbcTestUtils.clearAndInsert("000", 100, connection, DATA_GENERATOR, PROPS);
            InputBatch<Dataset<Row>> runSource = runSource(Option.empty(), 100L);
            Assertions.assertEquals(100L, ((Dataset) runSource.getBatch().get()).count());
            Assertions.assertEquals("100", runSource.getCheckpointForNextBatch());
            JdbcTestUtils.insert("001", 100, connection, DATA_GENERATOR, PROPS);
            Assertions.assertEquals(0L, ((Dataset) runSource(Option.of("200"), 50L).getBatch().get()).count());
        } catch (Exception e) {
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    public void testIncrementalFetchFallbackToFullFetchWhenError() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert");
        try {
            JdbcTestUtils.clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
            InputBatch<Dataset<Row>> runSource = runSource(Option.empty(), 10L);
            Assertions.assertEquals(10L, ((Dataset) runSource.getBatch().get()).count());
            JdbcTestUtils.insert("001", 10, connection, DATA_GENERATOR, PROPS);
            PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "dummy_col");
            Assertions.assertThrows(HoodieException.class, () -> {
                runSource(Option.of(runSource.getCheckpointForNextBatch()), -1L);
            });
            PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.fallback.to.full.fetch", "true");
            Assertions.assertTrue(((Exception) Assertions.assertThrows(HoodieException.class, () -> {
                runSource(Option.of(runSource.getCheckpointForNextBatch()), -1L);
            })).getMessage().contains("Failed to checkpoint"));
        } catch (Exception e) {
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    public void testFullFetchWithCommitTime() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
        try {
            JdbcTestUtils.clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
            Assertions.assertEquals(10L, ((Dataset) runSource(Option.empty(), 10L).getBatch().get()).count());
            JdbcTestUtils.insert("001", 10, connection, DATA_GENERATOR, PROPS);
            Dataset dataset = (Dataset) runSource(Option.empty(), 20L).getBatch().get();
            Assertions.assertEquals(20L, dataset.count());
            Assertions.assertEquals(10L, dataset.where("commit_time=000").count());
            Assertions.assertEquals(10L, dataset.where("commit_time=001").count());
        } catch (Exception e) {
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    public void testFullFetchWithCheckpoint() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert");
        try {
            JdbcTestUtils.clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
            InputBatch<Dataset<Row>> runSource = runSource(Option.empty(), 10L);
            Dataset dataset = (Dataset) runSource.getBatch().get();
            Assertions.assertEquals(10L, dataset.count());
            Assertions.assertEquals("", runSource.getCheckpointForNextBatch());
            String string = ((Row) dataset.agg(functions.max(dataset.col(PROPS.getString("hoodie.deltastreamer.jdbc.table.incr.column.name"))).cast(DataTypes.StringType), new Column[0]).first()).getString(0);
            JdbcTestUtils.insert("001", 10, connection, DATA_GENERATOR, PROPS);
            Dataset dataset2 = (Dataset) runSource(Option.of(string), 10L).getBatch().get();
            Assertions.assertEquals(10L, dataset2.count());
            Assertions.assertEquals(10L, dataset2.where("commit_time=001").count());
        } catch (Exception e) {
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    public void testSourceWithPasswordOnFs() {
        try {
            writeSecretToFs();
            PROPS.remove("hoodie.deltastreamer.jdbc.password");
            PROPS.setProperty("hoodie.deltastreamer.jdbc.password.file", "file:///tmp/hudi/config/secret");
            PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
            JdbcTestUtils.clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
            Assertions.assertEquals(10L, ((Dataset) runSource(Option.empty(), 10L).getBatch().get()).count());
        } catch (Exception e) {
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    public void testSourceWithNoPasswordThrowsException() {
        Assertions.assertThrows(HoodieException.class, () -> {
            writeSecretToFs();
            PROPS.remove("hoodie.deltastreamer.jdbc.password");
            PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
            JdbcTestUtils.clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
            runSource(Option.empty(), 10L);
        });
    }

    @Test
    public void testSourceWithExtraOptions() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.extra.options.fetchsize", "10");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
        PROPS.remove("hoodie.deltastreamer.jdbc.table.incr.column.name");
        try {
            JdbcTestUtils.clearAndInsert("000", 20, connection, DATA_GENERATOR, PROPS);
            Assertions.assertEquals(10L, ((Dataset) runSource(Option.empty(), 10L).getBatch().get()).count());
        } catch (Exception e) {
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    public void testSourceWithStorageLevel() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.storage.level", "NONE");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
        try {
            JdbcTestUtils.clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
            Dataset dataset = (Dataset) runSource(Option.empty(), 10L).getBatch().get();
            Assertions.assertEquals(10L, dataset.count());
            Assertions.assertEquals(StorageLevel.NONE(), dataset.storageLevel());
        } catch (Exception e) {
            Assertions.fail(e.getMessage());
        }
    }

    private void writeSecretToFs() throws IOException {
        FSDataOutputStream create = FileSystem.get(new Configuration()).create(new Path("file:///tmp/hudi/config/secret"));
        create.writeBytes("jdbc");
        create.close();
    }

    private InputBatch<Dataset<Row>> runSource(Option<String> option, long j) {
        return new JdbcSource(PROPS, this.jsc, this.sparkSession, (SchemaProvider) null).fetchNewData(option, j);
    }
}
