package org.apache.flink.connector.jdbc.table;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
import org.apache.flink.connector.jdbc.testutils.TableManaged;
import org.apache.flink.connector.jdbc.testutils.tables.TableBuilder;
import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.StreamTestSink;
import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager;
import org.apache.flink.table.test.lookup.cache.LookupCacheAssert;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

/* loaded from: input_file:org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.class */
public abstract class JdbcDynamicTableSourceITCase implements DatabaseTest {

    @RegisterExtension
    static final MiniClusterExtension MINI_CLUSTER_RESOURCE = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setConfiguration(new Configuration()).build());
    private final TableRow inputTable = createInputTable();
    public static StreamExecutionEnvironment env;
    public static TableEnvironment tEnv;

    /* loaded from: input_file:org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase$Caching.class */
    private enum Caching {
        ENABLE_CACHE,
        DISABLE_CACHE
    }

    protected TableRow createInputTable() {
        return TableBuilder.tableRow(JdbcFilterPushdownPreparedStatementVisitorTest.INPUT_TABLE, TableBuilder.field("id", DataTypes.BIGINT().notNull()), TableBuilder.field("decimal_col", DataTypes.DECIMAL(10, 4)), TableBuilder.field("timestamp6_col", DataTypes.TIMESTAMP(6)));
    }

    @Override // org.apache.flink.connector.jdbc.testutils.DatabaseTest
    public List<TableManaged> getManagedTables() {
        return Collections.singletonList(this.inputTable);
    }

    protected List<Row> getTestData() {
        return Arrays.asList(Row.of(new Object[]{1L, BigDecimal.valueOf(100.1234d), LocalDateTime.parse("2020-01-01T15:35:00.123456")}), Row.of(new Object[]{2L, BigDecimal.valueOf(101.1234d), LocalDateTime.parse("2020-01-01T15:36:01.123456")}));
    }

    @BeforeEach
    void beforeAll() throws SQLException {
        Connection connection = getMetadata().getConnection();
        try {
            this.inputTable.insertIntoTableValues(connection, getTestData());
            if (connection != null) {
                connection.close();
            }
            env = StreamExecutionEnvironment.getExecutionEnvironment();
            tEnv = StreamTableEnvironment.create(env);
        } catch (Throwable th) {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @AfterEach
    void afterEach() {
        StreamTestSink.clear();
    }

    @Test
    void testJdbcSource() {
        tEnv.executeSql(this.inputTable.getCreateQueryForFlink(getMetadata(), "testTable"));
        Assertions.assertThat(executeQuery("SELECT * FROM testTable")).containsExactlyInAnyOrderElementsOf(getTestData());
    }

    @Test
    void testProject() {
        tEnv.executeSql(this.inputTable.getCreateQueryForFlink(getMetadata(), "testTable", Arrays.asList("'scan.partition.column'='id'", "'scan.partition.num'='2'", "'scan.partition.lower-bound'='0'", "'scan.partition.upper-bound'='100'")));
        Assertions.assertThat(executeQuery(String.format("SELECT %s FROM %s", String.join(",", (CharSequence[]) Arrays.copyOfRange(this.inputTable.getTableFields(), 0, 3)), "testTable"))).containsExactlyInAnyOrderElementsOf((List) getTestData().stream().map(row -> {
            return Row.of(new Object[]{row.getField(0), row.getField(1), row.getField(2)});
        }).collect(Collectors.toList()));
    }

    @Test
    public void testLimit() throws Exception {
        tEnv.executeSql(this.inputTable.getCreateQueryForFlink(getMetadata(), "testTable", Arrays.asList("'scan.partition.column'='id'", "'scan.partition.num'='2'", "'scan.partition.lower-bound'='1'", "'scan.partition.upper-bound'='2'")));
        List<Row> executeQuery = executeQuery("SELECT * FROM testTable LIMIT 1");
        Assertions.assertThat(executeQuery).hasSize(1);
        Assertions.assertThat(getTestData()).as("The actual output is not a subset of the expected set.", new Object[0]).containsAll(executeQuery);
    }

    @Test
    public void testFilter() throws Exception {
        tEnv.executeSql(this.inputTable.getCreateQueryForFlink(getMetadata(), "testTable"));
        tEnv.executeSql(this.inputTable.getCreateQueryForFlink(getMetadata(), "PARTITIONED_TABLE", Arrays.asList("'scan.partition.column'='id'", "'scan.partition.num'='1'", "'scan.partition.lower-bound'='1'", "'scan.partition.upper-bound'='1'")));
        tEnv.executeSql(String.format("CREATE VIEW FAKE_TABLE (idx, %s) as (SELECT * from %s )", Arrays.stream(this.inputTable.getTableFields()).filter(str -> {
            return !str.equals("id");
        }).collect(Collectors.joining(",")), "testTable"));
        Row orElseThrow = getTestData().stream().filter(row -> {
            return row.getFieldAs(0).equals(1L);
        }).findAny().orElseThrow(NullPointerException::new);
        Row orElseThrow2 = getTestData().stream().filter(row2 -> {
            return row2.getFieldAs(0).equals(2L);
        }).findAny().orElseThrow(NullPointerException::new);
        List<Row> testData = getTestData();
        Assertions.assertThat(executeQuery("SELECT * FROM FAKE_TABLE WHERE idx = 1")).containsExactly(new Row[]{orElseThrow});
        Assertions.assertThat(executeQuery("SELECT * FROM FAKE_TABLE WHERE timestamp6_col = TIMESTAMP '2020-01-01 15:35:00.123456'")).containsExactly(new Row[]{orElseThrow});
        Assertions.assertThat(executeQuery("SELECT * FROM FAKE_TABLE WHERE 1 = idx AND decimal_col IN (100.1234, 101.1234)")).containsExactly(new Row[]{orElseThrow});
        Assertions.assertThat(executeQuery("SELECT * FROM FAKE_TABLE WHERE idx = 1 AND decimal_col = 100.1234 OR decimal_col = 101.1234")).containsExactlyInAnyOrderElementsOf(testData);
        Assertions.assertThat(executeQuery("SELECT * FROM FAKE_TABLE WHERE (2 = idx AND decimal_col = 100.1234) OR decimal_col = 101.1234")).containsExactly(new Row[]{orElseThrow2});
        Assertions.assertThat(executeQuery("SELECT * FROM FAKE_TABLE WHERE idx = 2 AND decimal_col > 100 OR decimal_col = 101.123")).containsExactly(new Row[]{orElseThrow2});
        Assertions.assertThat(executeQuery("SELECT * FROM FAKE_TABLE WHERE 2 = idx AND (decimal_col = 100.1234 OR decimal_col = 102.1234)")).isEmpty();
        Assertions.assertThat(executeQuery("SELECT * FROM PARTITIONED_TABLE WHERE id = 2 AND decimal_col > 100 OR decimal_col = 101.123")).isEmpty();
        Assertions.assertThat(executeQuery("SELECT * FROM PARTITIONED_TABLE WHERE 1 = id AND decimal_col IN (100.1234, 101.1234)")).containsExactly(new Row[]{orElseThrow});
    }

    @EnumSource(Caching.class)
    @ParameterizedTest
    void testLookupJoin(Caching caching) throws Exception {
        List<String> emptyList = Collections.emptyList();
        if (caching.equals(Caching.ENABLE_CACHE)) {
            emptyList = Arrays.asList("'lookup.cache.max-rows' = '100'", "'lookup.cache.ttl' = '10min'");
        }
        tEnv.executeSql(this.inputTable.getCreateQueryForFlink(getMetadata(), "jdbc_lookup", emptyList));
        tEnv.executeSql(String.format("CREATE TABLE value_source (  `id` BIGINT,  `name` STRING,  `proctime` AS PROCTIME()) WITH ( 'connector' = 'values',  'data-id' = '%s')", TestValuesTableFactory.registerData(Arrays.asList(Row.of(new Object[]{1L, "Alice"}), Row.of(new Object[]{1L, "Alice"}), Row.of(new Object[]{2L, "Bob"}), Row.of(new Object[]{3L, "Charlie"})))));
        if (caching == Caching.ENABLE_CACHE) {
            LookupCacheManager.keepCacheOnRelease(true);
        }
        try {
            List<Row> executeQuery = executeQuery("SELECT S.id, S.name, D.id, D.timestamp6_col, D.decimal_col FROM value_source AS S JOIN jdbc_lookup for system_time as of S.proctime AS D ON S.id = D.id");
            Assertions.assertThat(executeQuery).hasSize(3);
            Assertions.assertThat(executeQuery).as("The actual output is not a subset of the expected set", new Object[0]).containsAll(Arrays.asList(Row.of(new Object[]{1L, "Alice", 1L, LocalDateTime.parse("2020-01-01T15:35:00.123456"), BigDecimal.valueOf(100.1234d)}), Row.of(new Object[]{1L, "Alice", 1L, LocalDateTime.parse("2020-01-01T15:35:00.123456"), BigDecimal.valueOf(100.1234d)}), Row.of(new Object[]{2L, "Bob", 2L, LocalDateTime.parse("2020-01-01T15:36:01.123456"), BigDecimal.valueOf(101.1234d)})));
            if (caching == Caching.ENABLE_CACHE) {
                validateCachedValues();
            }
        } finally {
            if (caching == Caching.ENABLE_CACHE) {
                LookupCacheManager.getInstance().checkAllReleased();
                LookupCacheManager.getInstance().clear();
                LookupCacheManager.keepCacheOnRelease(false);
            }
        }
    }

    private List<Row> executeQuery(String str) {
        return CollectionUtil.iteratorToList(tEnv.executeSql(str).collect());
    }

    private void validateCachedValues() {
        Map managedCaches = LookupCacheManager.getInstance().getManagedCaches();
        Assertions.assertThat(managedCaches).as("There should be only 1 shared cache registered", new Object[0]).hasSize(1);
        LookupCache cache = ((LookupCacheManager.RefCountedCache) managedCaches.get(managedCaches.keySet().iterator().next())).getCache();
        GenericRowData of = GenericRowData.of(new Object[]{1L});
        GenericRowData of2 = GenericRowData.of(new Object[]{1L, DecimalData.fromBigDecimal(BigDecimal.valueOf(100.1234d), 10, 4), TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456"))});
        GenericRowData of3 = GenericRowData.of(new Object[]{2L});
        GenericRowData of4 = GenericRowData.of(new Object[]{2L, DecimalData.fromBigDecimal(BigDecimal.valueOf(101.1234d), 10, 4), TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456"))});
        GenericRowData of5 = GenericRowData.of(new Object[]{3L});
        HashMap hashMap = new HashMap();
        hashMap.put(of, Collections.singletonList(of2));
        hashMap.put(of3, Collections.singletonList(of4));
        hashMap.put(of5, Collections.emptyList());
        LookupCacheAssert.assertThat(cache).containsExactlyEntriesOf(hashMap);
    }
}
