package org.apache.flink.connector.mongodb.table;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.mongodb.testutils.MongoTestUtil;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager;
import org.apache.flink.table.test.lookup.cache.LookupCacheAssert;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.assertj.core.api.Assertions;
import org.bson.BsonArray;
import org.bson.BsonBinary;
import org.bson.BsonBoolean;
import org.bson.BsonDateTime;
import org.bson.BsonDecimal128;
import org.bson.BsonDocument;
import org.bson.BsonDouble;
import org.bson.BsonInt32;
import org.bson.BsonInt64;
import org.bson.BsonNull;
import org.bson.BsonString;
import org.bson.BsonTimestamp;
import org.bson.types.Decimal128;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
/* loaded from: input_file:org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.class */
class MongoDynamicTableSourceITCase {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDynamicTableSinkITCase.class);

    @RegisterExtension
    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).build());

    @Container
    private static final MongoDBContainer MONGO_CONTAINER = MongoTestUtil.createMongoDBContainer().withLogConsumer(new Slf4jLogConsumer(LOG));
    private static final String TEST_DATABASE = "test";
    private static final String TEST_COLLECTION = "mongo_table_source";
    private static MongoClient mongoClient;
    private static StreamExecutionEnvironment env;
    private static StreamTableEnvironment tEnv;

    /* loaded from: input_file:org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase$Caching.class */
    private enum Caching {
        ENABLE_CACHE,
        DISABLE_CACHE
    }

    MongoDynamicTableSourceITCase() {
    }

    @BeforeAll
    static void beforeAll() {
        mongoClient = MongoClients.create(MONGO_CONTAINER.getConnectionString());
        mongoClient.getDatabase(TEST_DATABASE).getCollection(TEST_COLLECTION).withDocumentClass(BsonDocument.class).insertMany(createTestData());
    }

    @AfterAll
    static void afterAll() {
        if (mongoClient != null) {
            mongoClient.close();
        }
    }

    @BeforeEach
    void before() {
        env = StreamExecutionEnvironment.getExecutionEnvironment();
        tEnv = StreamTableEnvironment.create(env);
        tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
    }

    @Test
    void testSource() {
        tEnv.executeSql(createTestDDl(null));
        Assertions.assertThat(executeQueryToList("SELECT * FROM mongo_source")).isEqualTo(expectedRows());
    }

    @Test
    void testProject() {
        tEnv.executeSql(createTestDDl(null));
        Assertions.assertThat(executeQueryToList("SELECT f1, f10 FROM mongo_source")).isEqualTo(Arrays.asList(Row.of(new Object[]{"2", Row.of(new Object[]{13})}), Row.of(new Object[]{"3", Row.of(new Object[]{14})})));
    }

    @Test
    void testLimit() {
        tEnv.executeSql(createTestDDl(null));
        List<Row> executeQueryToList = executeQueryToList("SELECT * FROM mongo_source LIMIT 1");
        Assertions.assertThat(executeQueryToList).hasSize(1);
        Assertions.assertThat(executeQueryToList).containsAnyElementsOf(expectedRows());
    }

    /* JADX WARN: Finally extract failed */
    @EnumSource(Caching.class)
    @ParameterizedTest
    void testLookupJoin(Caching caching) throws Exception {
        HashMap hashMap = new HashMap();
        if (caching.equals(Caching.ENABLE_CACHE)) {
            hashMap.put(LookupOptions.CACHE_TYPE.key(), "PARTIAL");
            hashMap.put(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE.key(), "10min");
            hashMap.put(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS.key(), "10min");
            hashMap.put(LookupOptions.PARTIAL_CACHE_MAX_ROWS.key(), "100");
            hashMap.put(LookupOptions.MAX_RETRIES.key(), "10");
        }
        tEnv.executeSql(createTestDDl(hashMap));
        tEnv.createTemporaryView("value_source", env.fromCollection(Arrays.asList(Row.of(new Object[]{1L, "Alice"}), Row.of(new Object[]{1L, "Alice"}), Row.of(new Object[]{2L, "Bob"}), Row.of(new Object[]{3L, "Charlie"}))).returns(new RowTypeInfo(new TypeInformation[]{Types.LONG, Types.STRING}, new String[]{"id", "name"})), Schema.newBuilder().column("id", DataTypes.BIGINT()).column("name", DataTypes.STRING()).columnByExpression("proctime", "PROCTIME()").build());
        if (caching == Caching.ENABLE_CACHE) {
            LookupCacheManager.keepCacheOnRelease(true);
        }
        try {
            CloseableIterator<Row> executeQuery = executeQuery("SELECT S.id, S.name, D._id, D.f1, D.f2 FROM value_source AS S JOIN mongo_source for system_time as of S.proctime AS D ON S.id = D._id");
            Throwable th = null;
            try {
                List iteratorToList = CollectionUtil.iteratorToList(executeQuery);
                List asList = Arrays.asList(Row.of(new Object[]{1L, "Alice", 1L, "2", true}), Row.of(new Object[]{1L, "Alice", 1L, "2", true}), Row.of(new Object[]{2L, "Bob", 2L, "3", false}));
                Assertions.assertThat(iteratorToList).hasSize(3);
                Assertions.assertThat(iteratorToList).isEqualTo(asList);
                if (caching == Caching.ENABLE_CACHE) {
                    Map managedCaches = LookupCacheManager.getInstance().getManagedCaches();
                    Assertions.assertThat(managedCaches).hasSize(1);
                    validateCachedValues(((LookupCacheManager.RefCountedCache) managedCaches.get(managedCaches.keySet().iterator().next())).getCache());
                }
                if (executeQuery != null) {
                    if (0 != 0) {
                        try {
                            executeQuery.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        executeQuery.close();
                    }
                }
            } catch (Throwable th3) {
                if (executeQuery != null) {
                    if (0 != 0) {
                        try {
                            executeQuery.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        executeQuery.close();
                    }
                }
                throw th3;
            }
        } finally {
            if (caching == Caching.ENABLE_CACHE) {
                LookupCacheManager.getInstance().checkAllReleased();
                LookupCacheManager.getInstance().clear();
                LookupCacheManager.keepCacheOnRelease(false);
            }
        }
    }

    @EnumSource(FilterHandlingPolicy.class)
    @ParameterizedTest
    void testFilter(FilterHandlingPolicy filterHandlingPolicy) {
        tEnv.executeSql(createTestDDl(Collections.singletonMap(MongoConnectorOptions.FILTER_HANDLING_POLICY.key(), filterHandlingPolicy.name())));
        tEnv.executeSql("CREATE VIEW fake_table (idx, f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11, f12) as (SELECT * from mongo_source )");
        List<Row> executeQueryToList = executeQueryToList("SELECT * FROM mongo_source");
        Assertions.assertThat(executeQueryToList).hasSize(2);
        Row orElseThrow = executeQueryToList.stream().filter(row -> {
            return row.getFieldAs(0).equals(1L);
        }).findAny().orElseThrow(NullPointerException::new);
        Row orElseThrow2 = executeQueryToList.stream().filter(row2 -> {
            return row2.getFieldAs(0).equals(2L);
        }).findAny().orElseThrow(NullPointerException::new);
        Assertions.assertThat(executeQueryToList("SELECT * FROM fake_table WHERE 1 = idx")).containsExactly(new Row[]{orElseThrow});
        Assertions.assertThat(executeQueryToList("SELECT * FROM fake_table WHERE f5 = TIMESTAMP '2022-09-07 10:25:28.127'")).containsExactly(new Row[]{orElseThrow});
        Assertions.assertThat(executeQueryToList("SELECT * FROM fake_table WHERE idx IN (2, 3)")).containsExactly(new Row[]{orElseThrow2});
        Assertions.assertThat(executeQueryToList("SELECT * FROM fake_table WHERE f7 NOT IN (CAST(1.0 AS DOUBLE), CAST(1.1 AS DOUBLE))")).containsExactly(new Row[]{orElseThrow});
        Assertions.assertThat(executeQueryToList("SELECT * FROM fake_table WHERE idx <> 1 OR f8 = 1.10")).containsExactlyInAnyOrderElementsOf(executeQueryToList);
        Assertions.assertThat(executeQueryToList("SELECT * FROM fake_table WHERE (f0 IS NOT NULL AND f2 IS TRUE) OR f8 = 102.2")).containsExactly(new Row[]{orElseThrow});
        Assertions.assertThat(executeQueryToList("SELECT * FROM fake_table WHERE f8 > 1.09 AND f8 < 1.11")).containsExactly(new Row[]{orElseThrow});
        Assertions.assertThat(executeQueryToList("SELECT * FROM fake_table WHERE f0 IS NULL AND (f8 >= 1.11 OR f4 <= 5)")).containsExactly(new Row[]{orElseThrow2});
        Assertions.assertThat(executeQueryToList("SELECT * FROM mongo_source WHERE _id = 2 AND f7 > 0.8 OR f7 < 1.1")).containsExactlyInAnyOrderElementsOf(executeQueryToList);
        Assertions.assertThat(executeQueryToList("SELECT * FROM mongo_source WHERE 1 = _id AND f1 NOT IN ('2', '3')")).isEmpty();
    }

    private static void validateCachedValues(LookupCache lookupCache) {
        GenericRowData of = GenericRowData.of(new Object[]{1L});
        GenericRowData of2 = GenericRowData.of(new Object[]{1L, StringData.fromString("2"), true});
        GenericRowData of3 = GenericRowData.of(new Object[]{2L});
        GenericRowData of4 = GenericRowData.of(new Object[]{2L, StringData.fromString("3"), false});
        GenericRowData of5 = GenericRowData.of(new Object[]{3L});
        HashMap hashMap = new HashMap();
        hashMap.put(of, Collections.singletonList(of2));
        hashMap.put(of3, Collections.singletonList(of4));
        hashMap.put(of5, Collections.emptyList());
        LookupCacheAssert.assertThat(lookupCache).containsExactlyEntriesOf(hashMap);
    }

    private static String createTestDDl(Map<String, String> map) {
        HashMap hashMap = new HashMap();
        hashMap.put(FactoryUtil.CONNECTOR.key(), MongoTestUtil.MONGODB_HOSTNAME);
        hashMap.put(MongoConnectorOptions.URI.key(), MONGO_CONTAINER.getConnectionString());
        hashMap.put(MongoConnectorOptions.DATABASE.key(), TEST_DATABASE);
        hashMap.put(MongoConnectorOptions.COLLECTION.key(), TEST_COLLECTION);
        if (map != null) {
            hashMap.putAll(map);
        }
        return String.join("\n", Arrays.asList("CREATE TABLE mongo_source", "(", "  _id BIGINT,", "  f0 STRING,", "  f1 STRING,", "  f2 BOOLEAN,", "  f3 BINARY,", "  f4 INTEGER,", "  f5 TIMESTAMP_LTZ(3),", "  f6 TIMESTAMP_LTZ(0),", "  f7 DOUBLE,", "  f8 DECIMAL(10, 2),", "  f9 MAP<STRING, INTEGER>,", "  f10 ROW<k INTEGER>,", "  f11 ARRAY<STRING>,", "  f12 ARRAY<ROW<k STRING>>", ") WITH (", (String) hashMap.entrySet().stream().map(entry -> {
            return String.format("'%s' = '%s'", entry.getKey(), entry.getValue());
        }).collect(Collectors.joining(",\n")), ")"));
    }

    private static List<Row> expectedRows() {
        return Arrays.asList(Row.of(new Object[]{1L, "", "2", true, new byte[]{3}, 6, Instant.ofEpochMilli(1662546328127L), Instant.ofEpochSecond(1662546328L), Double.valueOf(0.9d), new BigDecimal("1.10"), Collections.singletonMap("k", 12), Row.of(new Object[]{13}), new String[]{"11_1", "11_2"}, new Row[]{Row.of(new Object[]{"12_1"}), Row.of(new Object[]{"12_2"})}}), Row.of(new Object[]{2L, null, "3", false, new byte[]{4}, 7, Instant.ofEpochMilli(1662546328128L), Instant.ofEpochSecond(1662546329L), Double.valueOf(1.0d), new BigDecimal("1.11"), Collections.singletonMap("k", 13), Row.of(new Object[]{14}), new String[]{"11_3", "11_4"}, new Row[]{Row.of(new Object[]{"12_3"}), Row.of(new Object[]{"12_4"})}}));
    }

    private static List<BsonDocument> createTestData() {
        return Arrays.asList(new BsonDocument().append("_id", new BsonInt64(1L)).append("f0", new BsonString("")).append("f1", new BsonString("2")).append("f2", BsonBoolean.TRUE).append("f3", new BsonBinary(new byte[]{3})).append("f4", new BsonInt32(6)).append("f5", new BsonDateTime(1662546328127L)).append("f6", new BsonTimestamp(1662546328, 0)).append("f7", new BsonDouble(0.9d)).append("f8", new BsonDecimal128(new Decimal128(new BigDecimal("1.10")))).append("f9", new BsonDocument("k", new BsonInt32(12))).append("f10", new BsonDocument("k", new BsonInt32(13))).append("f11", new BsonArray(Arrays.asList(new BsonString("11_1"), new BsonString("11_2")))).append("f12", new BsonArray(Arrays.asList(new BsonDocument("k", new BsonString("12_1")), new BsonDocument("k", new BsonString("12_2"))))), new BsonDocument().append("_id", new BsonInt64(2L)).append("f0", BsonNull.VALUE).append("f1", new BsonString("3")).append("f2", BsonBoolean.FALSE).append("f3", new BsonBinary(new byte[]{4})).append("f4", new BsonInt32(7)).append("f5", new BsonDateTime(1662546328128L)).append("f6", new BsonTimestamp(1662546329, 0)).append("f7", new BsonDouble(1.0d)).append("f8", new BsonDecimal128(new Decimal128(new BigDecimal("1.11")))).append("f9", new BsonDocument("k", new BsonInt32(13))).append("f10", new BsonDocument("k", new BsonInt32(14))).append("f11", new BsonArray(Arrays.asList(new BsonString("11_3"), new BsonString("11_4")))).append("f12", new BsonArray(Arrays.asList(new BsonDocument("k", new BsonString("12_3")), new BsonDocument("k", new BsonString("12_4"))))));
    }

    private static List<Row> executeQueryToList(String str) {
        return CollectionUtil.iteratorToList(executeQuery(str));
    }

    private static CloseableIterator<Row> executeQuery(String str) {
        return tEnv.executeSql(str).collect();
    }
}
