package org.apache.flink.connector.jdbc.source;

import java.io.Serializable;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.jdbc.JdbcDataTestBase;
import org.apache.flink.connector.jdbc.JdbcTestFixture;
import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/connector/jdbc/source/JdbcSourceITCase.class */
public class JdbcSourceITCase extends JdbcDataTestBase implements JdbcITCaseBase {
    public static Queue<JdbcTestFixture.TestEntry> collectedRecords;
    private final String sql = "select id, title, author, price, qty from books";

    /* loaded from: input_file:org/apache/flink/connector/jdbc/source/JdbcSourceITCase$TestingSinkFunction.class */
    static class TestingSinkFunction implements SinkFunction<JdbcTestFixture.TestEntry> {
        TestingSinkFunction() {
        }

        public void invoke(JdbcTestFixture.TestEntry testEntry, SinkFunction.Context context) throws Exception {
            JdbcSourceITCase.collectedRecords.add(testEntry);
        }
    }

    JdbcSourceITCase() {
    }

    @BeforeEach
    void init() {
        collectedRecords = new ConcurrentLinkedDeque();
    }

    @Test
    void testReadWithoutParallelismWithoutParamsProvider() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
        executionEnvironment.setParallelism(1);
        executionEnvironment.fromSource(JdbcSource.builder().setTypeInformation(TypeInformation.of(JdbcTestFixture.TestEntry.class)).setSql("select id, title, author, price, qty from books").setDBUrl(getMetadata().getJdbcUrl()).setDriverName(getMetadata().getDriverClass()).setResultExtractor(this.extractor).build(), WatermarkStrategy.noWatermarks(), "TestSource").addSink(new TestingSinkFunction());
        executionEnvironment.execute();
        Assertions.assertThat(collectedRecords).containsExactlyInAnyOrder(JdbcTestFixture.TEST_DATA);
    }

    /* JADX WARN: Type inference failed for: r3v1, types: [java.io.Serializable[], java.io.Serializable[][]] */
    @Test
    void testReadWithoutParallelismWithParamsProvider() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
        executionEnvironment.setParallelism(1);
        executionEnvironment.fromSource(JdbcSource.builder().setTypeInformation(TypeInformation.of(JdbcTestFixture.TestEntry.class)).setSql("select id, title, author, price, qty from books where id >= ? and id <= ?").setJdbcParameterValuesProvider(new JdbcGenericParameterValuesProvider((Serializable[][]) new Serializable[]{new Serializable[]{1001, 1005}, new Serializable[]{1006, 1010}})).setDBUrl(getMetadata().getJdbcUrl()).setDriverName(getMetadata().getDriverClass()).setResultExtractor(this.extractor).build(), WatermarkStrategy.noWatermarks(), "TestSource").addSink(new TestingSinkFunction());
        executionEnvironment.execute();
        Assertions.assertThat(collectedRecords).containsExactlyInAnyOrder(JdbcTestFixture.TEST_DATA);
    }

    @Test
    void testReadWithParallelismWithoutParamsProvider() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
        executionEnvironment.setParallelism(2);
        executionEnvironment.fromSource(JdbcSource.builder().setTypeInformation(TypeInformation.of(JdbcTestFixture.TestEntry.class)).setSql("select id, title, author, price, qty from books").setDBUrl(getMetadata().getJdbcUrl()).setDriverName(getMetadata().getDriverClass()).setResultExtractor(this.extractor).build(), WatermarkStrategy.noWatermarks(), "TestSource").addSink(new TestingSinkFunction());
        executionEnvironment.execute();
        Assertions.assertThat(collectedRecords).containsExactlyInAnyOrder(JdbcTestFixture.TEST_DATA);
    }

    /* JADX WARN: Type inference failed for: r3v1, types: [java.io.Serializable[], java.io.Serializable[][]] */
    @Test
    void testReadWithParallelismWithParamsProvider() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
        executionEnvironment.setParallelism(2);
        executionEnvironment.fromSource(JdbcSource.builder().setTypeInformation(TypeInformation.of(JdbcTestFixture.TestEntry.class)).setSql("select id, title, author, price, qty from books where id >= ? and id <= ?").setJdbcParameterValuesProvider(new JdbcGenericParameterValuesProvider((Serializable[][]) new Serializable[]{new Serializable[]{1001, 1005}, new Serializable[]{1006, 1010}})).setDBUrl(getMetadata().getJdbcUrl()).setDriverName(getMetadata().getDriverClass()).setResultExtractor(this.extractor).build(), WatermarkStrategy.noWatermarks(), "TestSource").addSink(new TestingSinkFunction());
        executionEnvironment.execute();
        Assertions.assertThat(collectedRecords).containsExactlyInAnyOrder(JdbcTestFixture.TEST_DATA);
    }
}
