package org.apache.flink.connector.jdbc.table;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcTestBase;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.StreamTestSink;
import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager;
import org.apache.flink.table.test.lookup.cache.LookupCacheAssert;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

/* loaded from: input_file:org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.class */
public class JdbcDynamicTableSourceITCase {

    @RegisterExtension
    static final MiniClusterExtension MINI_CLUSTER_RESOURCE = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setConfiguration(new Configuration()).build());
    public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";
    public static final String DB_URL = "jdbc:derby:memory:test";
    public static final String INPUT_TABLE = "jdbDynamicTableSource";
    public static StreamExecutionEnvironment env;
    public static TableEnvironment tEnv;

    /* loaded from: input_file:org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase$Caching.class */
    private enum Caching {
        ENABLE_CACHE,
        DISABLE_CACHE
    }

    @BeforeAll
    static void beforeAll() throws ClassNotFoundException, SQLException {
        System.setProperty("derby.stream.error.field", JdbcTestBase.class.getCanonicalName() + ".DEV_NULL");
        Class.forName(DRIVER_CLASS);
        Connection connection = DriverManager.getConnection("jdbc:derby:memory:test;create=true");
        Throwable th = null;
        try {
            Statement createStatement = connection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.executeUpdate("CREATE TABLE jdbDynamicTableSource (id BIGINT NOT NULL,timestamp6_col TIMESTAMP, timestamp9_col TIMESTAMP, time_col TIME, real_col FLOAT(23), double_col FLOAT(24),decimal_col DECIMAL(10, 4))");
                    createStatement.executeUpdate("INSERT INTO jdbDynamicTableSource VALUES (1, TIMESTAMP('2020-01-01 15:35:00.123456'), TIMESTAMP('2020-01-01 15:35:00.123456789'), TIME('15:35:00'), 1.175E-37, 1.79769E+308, 100.1234)");
                    createStatement.executeUpdate("INSERT INTO jdbDynamicTableSource VALUES (2, TIMESTAMP('2020-01-01 15:36:01.123456'), TIMESTAMP('2020-01-01 15:36:01.123456789'), TIME('15:36:01'), -1.175E-37, -1.79769E+308, 101.1234)");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    if (connection != null) {
                        if (0 == 0) {
                            connection.close();
                            return;
                        }
                        try {
                            connection.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    connection.close();
                }
            }
            throw th8;
        }
    }

    @AfterAll
    static void afterAll() throws Exception {
        Class.forName(DRIVER_CLASS);
        Connection connection = DriverManager.getConnection(DB_URL);
        Throwable th = null;
        try {
            Statement createStatement = connection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.executeUpdate("DROP TABLE jdbDynamicTableSource");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    StreamTestSink.clear();
                } finally {
                }
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    connection.close();
                }
            }
        }
    }

    @BeforeEach
    void before() throws Exception {
        env = StreamExecutionEnvironment.getExecutionEnvironment();
        tEnv = StreamTableEnvironment.create(env);
    }

    @Test
    void testJdbcSource() throws Exception {
        tEnv.executeSql("CREATE TABLE jdbDynamicTableSource(id BIGINT,timestamp6_col TIMESTAMP(6),timestamp9_col TIMESTAMP(9),time_col TIME,real_col FLOAT,double_col DOUBLE,decimal_col DECIMAL(10, 4)) WITH (  'connector'='jdbc',  'url'='jdbc:derby:memory:test',  'table-name'='jdbDynamicTableSource')");
        Assertions.assertThat((List) CollectionUtil.iteratorToList(tEnv.executeSql("SELECT * FROM jdbDynamicTableSource").collect()).stream().map((v0) -> {
            return v0.toString();
        }).sorted().collect(Collectors.toList())).isEqualTo((List) Stream.of((Object[]) new String[]{"+I[1, 2020-01-01T15:35:00.123456, 2020-01-01T15:35:00.123456789, 15:35, 1.175E-37, 1.79769E308, 100.1234]", "+I[2, 2020-01-01T15:36:01.123456, 2020-01-01T15:36:01.123456789, 15:36:01, -1.175E-37, -1.79769E308, 101.1234]"}).sorted().collect(Collectors.toList()));
    }

    @Test
    void testProject() throws Exception {
        tEnv.executeSql("CREATE TABLE jdbDynamicTableSource(id BIGINT,timestamp6_col TIMESTAMP(6),timestamp9_col TIMESTAMP(9),time_col TIME,real_col FLOAT,double_col DOUBLE,decimal_col DECIMAL(10, 4)) WITH (  'connector'='jdbc',  'url'='jdbc:derby:memory:test',  'table-name'='jdbDynamicTableSource',  'scan.partition.column'='id',  'scan.partition.num'='2',  'scan.partition.lower-bound'='0',  'scan.partition.upper-bound'='100')");
        Assertions.assertThat((List) CollectionUtil.iteratorToList(tEnv.executeSql("SELECT id,timestamp6_col,decimal_col FROM jdbDynamicTableSource").collect()).stream().map((v0) -> {
            return v0.toString();
        }).sorted().collect(Collectors.toList())).isEqualTo((List) Stream.of((Object[]) new String[]{"+I[1, 2020-01-01T15:35:00.123456, 100.1234]", "+I[2, 2020-01-01T15:36:01.123456, 101.1234]"}).sorted().collect(Collectors.toList()));
    }

    @Test
    void testLimit() throws Exception {
        tEnv.executeSql("CREATE TABLE jdbDynamicTableSource(\nid BIGINT,\ntimestamp6_col TIMESTAMP(6),\ntimestamp9_col TIMESTAMP(9),\ntime_col TIME,\nreal_col FLOAT,\ndouble_col DOUBLE,\ndecimal_col DECIMAL(10, 4)\n) WITH (\n  'connector'='jdbc',\n  'url'='jdbc:derby:memory:test',\n  'table-name'='jdbDynamicTableSource',\n  'scan.partition.column'='id',\n  'scan.partition.num'='2',\n  'scan.partition.lower-bound'='1',\n  'scan.partition.upper-bound'='2'\n)");
        List list = (List) CollectionUtil.iteratorToList(tEnv.executeSql("SELECT * FROM jdbDynamicTableSource LIMIT 1").collect()).stream().map((v0) -> {
            return v0.toString();
        }).sorted().collect(Collectors.toList());
        HashSet hashSet = new HashSet();
        hashSet.add("+I[1, 2020-01-01T15:35:00.123456, 2020-01-01T15:35:00.123456789, 15:35, 1.175E-37, 1.79769E308, 100.1234]");
        hashSet.add("+I[2, 2020-01-01T15:36:01.123456, 2020-01-01T15:36:01.123456789, 15:36:01, -1.175E-37, -1.79769E308, 101.1234]");
        Assertions.assertThat(list).hasSize(1);
        Assertions.assertThat(hashSet).as("The actual output is not a subset of the expected set.", new Object[0]).containsAll(list);
    }

    @EnumSource(Caching.class)
    @ParameterizedTest
    void testLookupJoin(Caching caching) throws Exception {
        tEnv.executeSql(String.format("CREATE TABLE jdbc_lookup (id BIGINT,timestamp6_col TIMESTAMP(6),timestamp9_col TIMESTAMP(9),time_col TIME,real_col FLOAT,double_col DOUBLE,decimal_col DECIMAL(10, 4)) WITH (  %s  'connector' = 'jdbc',  'url' = '%s',  'table-name' = '%s')", caching.equals(Caching.ENABLE_CACHE) ? "'lookup.cache.max-rows' = '100', \n'lookup.cache.ttl' = '10min'," : "", DB_URL, INPUT_TABLE));
        tEnv.executeSql(String.format("CREATE TABLE value_source (\n`id` BIGINT,\n`name` STRING,\n`proctime` AS PROCTIME()\n) WITH (\n'connector' = 'values', \n'data-id' = '%s')", TestValuesTableFactory.registerData(Arrays.asList(Row.of(new Object[]{1L, "Alice"}), Row.of(new Object[]{1L, "Alice"}), Row.of(new Object[]{2L, "Bob"}), Row.of(new Object[]{3L, "Charlie"})))));
        if (caching == Caching.ENABLE_CACHE) {
            LookupCacheManager.keepCacheOnRelease(true);
        }
        try {
            CloseableIterator collect = tEnv.executeSql("SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source AS S JOIN jdbc_lookup for system_time as of S.proctime AS D ON S.id = D.id").collect();
            Throwable th = null;
            try {
                try {
                    List list = (List) CollectionUtil.iteratorToList(collect).stream().map((v0) -> {
                        return v0.toString();
                    }).sorted().collect(Collectors.toList());
                    ArrayList arrayList = new ArrayList();
                    arrayList.add("+I[1, Alice, 1, 2020-01-01T15:35:00.123456, 100.1234]");
                    arrayList.add("+I[1, Alice, 1, 2020-01-01T15:35:00.123456, 100.1234]");
                    arrayList.add("+I[2, Bob, 2, 2020-01-01T15:36:01.123456, 101.1234]");
                    Assertions.assertThat(list).hasSize(3);
                    Assertions.assertThat(arrayList).as("The actual output is not a subset of the expected set", new Object[0]).containsAll(arrayList);
                    if (caching == Caching.ENABLE_CACHE) {
                        Map managedCaches = LookupCacheManager.getInstance().getManagedCaches();
                        Assertions.assertThat(managedCaches).as("There should be only 1 shared cache registered", new Object[0]).hasSize(1);
                        validateCachedValues(((LookupCacheManager.RefCountedCache) managedCaches.get(managedCaches.keySet().iterator().next())).getCache());
                    }
                    if (collect != null) {
                        if (0 != 0) {
                            try {
                                collect.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            collect.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } finally {
            if (caching == Caching.ENABLE_CACHE) {
                LookupCacheManager.getInstance().checkAllReleased();
                LookupCacheManager.getInstance().clear();
                LookupCacheManager.keepCacheOnRelease(false);
            }
        }
    }

    private void validateCachedValues(LookupCache lookupCache) {
        GenericRowData of = GenericRowData.of(new Object[]{1L});
        GenericRowData of2 = GenericRowData.of(new Object[]{1L, TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")), Double.valueOf("1.79769E308")});
        GenericRowData of3 = GenericRowData.of(new Object[]{2L});
        GenericRowData of4 = GenericRowData.of(new Object[]{2L, TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")), Double.valueOf("-1.79769E308")});
        GenericRowData of5 = GenericRowData.of(new Object[]{3L});
        HashMap hashMap = new HashMap();
        hashMap.put(of, Collections.singletonList(of2));
        hashMap.put(of3, Collections.singletonList(of4));
        hashMap.put(of5, Collections.emptyList());
        LookupCacheAssert.assertThat(lookupCache).containsExactlyEntriesOf(hashMap);
    }
}
