package org.apache.flink.table.runtime.functions.table.fullcache.inputformat;

import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.source.lookup.cache.InterceptingCacheMetricGroup;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.runtime.functions.table.fullcache.TestCacheLoader;
import org.apache.flink.table.runtime.functions.table.lookup.fullcache.CacheLoader;
import org.apache.flink.table.runtime.functions.table.lookup.fullcache.inputformat.InputFormatCacheLoader;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.keyselector.GenericRowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.class */
class InputFormatCacheLoaderTest {
    private static final int DEFAULT_NUM_SPLITS = 2;
    private static final int DEFAULT_DELTA_NUM_SPLITS = 0;

    InputFormatCacheLoaderTest() {
    }

    @BeforeEach
    void resetCounter() {
        FullCacheTestInputFormat.OPEN_CLOSED_COUNTER.set(DEFAULT_DELTA_NUM_SPLITS);
    }

    @AfterEach
    void checkCounter() {
        Assertions.assertThat(FullCacheTestInputFormat.OPEN_CLOSED_COUNTER).hasValue(DEFAULT_DELTA_NUM_SPLITS);
    }

    @MethodSource({"deltaNumSplits"})
    @ParameterizedTest
    void testReadWithDifferentSplits(int i) throws Exception {
        InputFormatCacheLoader createCacheLoader = createCacheLoader(i);
        Throwable th = DEFAULT_DELTA_NUM_SPLITS;
        try {
            try {
                createCacheLoader.initializeMetrics(UnregisteredMetricsGroup.createCacheMetricGroup());
                reloadSynchronously(createCacheLoader);
                ConcurrentHashMap cache = createCacheLoader.getCache();
                assertCacheContent(cache);
                reloadSynchronously(createCacheLoader);
                Assertions.assertThat(createCacheLoader.getCache()).as("A new instance of cache should be present after reload.", new Object[DEFAULT_DELTA_NUM_SPLITS]).isNotSameAs(cache);
                ConcurrentHashMap cache2 = createCacheLoader.getCache();
                if (createCacheLoader != null) {
                    if (th != null) {
                        try {
                            createCacheLoader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createCacheLoader.close();
                    }
                }
                Assertions.assertThat(cache2.size()).as("Cache should be cleared after close.", new Object[DEFAULT_DELTA_NUM_SPLITS]).isZero();
            } finally {
            }
        } catch (Throwable th3) {
            if (createCacheLoader != null) {
                if (th != null) {
                    try {
                        createCacheLoader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createCacheLoader.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testCacheMetrics() throws Exception {
        InputFormatCacheLoader createCacheLoader = createCacheLoader(DEFAULT_DELTA_NUM_SPLITS);
        Throwable th = null;
        try {
            InterceptingCacheMetricGroup interceptingCacheMetricGroup = new InterceptingCacheMetricGroup();
            createCacheLoader.initializeMetrics(interceptingCacheMetricGroup);
            Assertions.assertThat(interceptingCacheMetricGroup.loadCounter).isNotNull();
            Assertions.assertThat(interceptingCacheMetricGroup.loadCounter.getCount()).isEqualTo(0L);
            Assertions.assertThat(interceptingCacheMetricGroup.numLoadFailuresCounter).isNotNull();
            Assertions.assertThat(interceptingCacheMetricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0L);
            Assertions.assertThat(interceptingCacheMetricGroup.numCachedRecordsGauge).isNotNull();
            Assertions.assertThat((Long) interceptingCacheMetricGroup.numCachedRecordsGauge.getValue()).isEqualTo(0L);
            Assertions.assertThat(interceptingCacheMetricGroup.latestLoadTimeGauge).isNotNull();
            Assertions.assertThat((Long) interceptingCacheMetricGroup.latestLoadTimeGauge.getValue()).isEqualTo(-1L);
            Assertions.assertThat(interceptingCacheMetricGroup.hitCounter).isNull();
            Assertions.assertThat(interceptingCacheMetricGroup.missCounter).isNull();
            Assertions.assertThat(interceptingCacheMetricGroup.numCachedBytesGauge).isNull();
            reloadSynchronously(createCacheLoader);
            Assertions.assertThat(interceptingCacheMetricGroup.loadCounter.getCount()).isEqualTo(1L);
            Assertions.assertThat((Long) interceptingCacheMetricGroup.latestLoadTimeGauge.getValue()).isNotEqualTo(-1L);
            Assertions.assertThat((Long) interceptingCacheMetricGroup.numCachedRecordsGauge.getValue()).isEqualTo(TestCacheLoader.DATA.size());
            if (createCacheLoader != null) {
                if (DEFAULT_DELTA_NUM_SPLITS == 0) {
                    createCacheLoader.close();
                    return;
                }
                try {
                    createCacheLoader.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createCacheLoader != null) {
                if (DEFAULT_DELTA_NUM_SPLITS != 0) {
                    try {
                        createCacheLoader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createCacheLoader.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testExceptionDuringReload() throws Exception {
        RuntimeException runtimeException = new RuntimeException("Load failed.");
        InputFormatCacheLoader createCacheLoader = createCacheLoader(DEFAULT_NUM_SPLITS, DEFAULT_DELTA_NUM_SPLITS, () -> {
            throw runtimeException;
        });
        Throwable th = DEFAULT_DELTA_NUM_SPLITS;
        try {
            try {
                InterceptingCacheMetricGroup interceptingCacheMetricGroup = new InterceptingCacheMetricGroup();
                createCacheLoader.initializeMetrics(interceptingCacheMetricGroup);
                Assertions.assertThatThrownBy(() -> {
                    reloadSynchronously(createCacheLoader);
                }).hasRootCause(runtimeException);
                Assertions.assertThat(interceptingCacheMetricGroup.loadCounter.getCount()).isEqualTo(0L);
                Assertions.assertThat(interceptingCacheMetricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1L);
                if (createCacheLoader != null) {
                    if (th == null) {
                        createCacheLoader.close();
                        return;
                    }
                    try {
                        createCacheLoader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createCacheLoader != null) {
                if (th != null) {
                    try {
                        createCacheLoader.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createCacheLoader.close();
                }
            }
            throw th4;
        }
    }

    @MethodSource({"numSplits"})
    @ParameterizedTest
    void testCloseDuringReload(int i) throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        Runnable runnable = () -> {
            oneShotLatch.trigger();
            Assertions.assertThatThrownBy(() -> {
                new OneShotLatch().await();
            }).as("Wait should be interrupted if everything works ok", new Object[DEFAULT_DELTA_NUM_SPLITS]).isInstanceOf(InterruptedException.class);
            Thread.currentThread().interrupt();
        };
        InterceptingCacheMetricGroup interceptingCacheMetricGroup = new InterceptingCacheMetricGroup();
        InputFormatCacheLoader createCacheLoader = createCacheLoader(i, DEFAULT_DELTA_NUM_SPLITS, runnable);
        Throwable th = DEFAULT_DELTA_NUM_SPLITS;
        try {
            try {
                createCacheLoader.initializeMetrics(interceptingCacheMetricGroup);
                CompletableFuture reloadAsync = createCacheLoader.reloadAsync();
                oneShotLatch.await();
                if (createCacheLoader != null) {
                    if (th != null) {
                        try {
                            createCacheLoader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createCacheLoader.close();
                    }
                }
                ((AbstractBooleanAssert) Assertions.assertThat(reloadAsync.isDone()).as("The reload future should still complete successfully indicating that the reload was intentionally stopped without an error.", new Object[DEFAULT_DELTA_NUM_SPLITS])).isTrue();
                Assertions.assertThat(interceptingCacheMetricGroup.loadCounter.getCount()).isEqualTo(0L);
                Assertions.assertThat(interceptingCacheMetricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0L);
            } finally {
            }
        } catch (Throwable th3) {
            if (createCacheLoader != null) {
                if (th != null) {
                    try {
                        createCacheLoader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createCacheLoader.close();
                }
            }
            throw th3;
        }
    }

    static Stream<Arguments> numSplits() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{1}), Arguments.of(new Object[]{Integer.valueOf(DEFAULT_NUM_SPLITS)})});
    }

    static Stream<Arguments> deltaNumSplits() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{-1}), Arguments.of(new Object[]{Integer.valueOf(DEFAULT_DELTA_NUM_SPLITS)}), Arguments.of(new Object[]{1})});
    }

    private void assertCacheContent(Map<RowData, Collection<RowData>> map) {
        Assertions.assertThat(map).containsOnlyKeys(TestCacheLoader.DATA.keySet());
        TestCacheLoader.DATA.forEach((rowData, collection) -> {
            Assertions.assertThat(collection).containsExactlyInAnyOrderElementsOf((Iterable) map.get(rowData));
        });
    }

    private void reloadSynchronously(CacheLoader cacheLoader) {
        cacheLoader.reloadAsync().join();
    }

    private InputFormatCacheLoader createCacheLoader(int i) throws Exception {
        return createCacheLoader(DEFAULT_NUM_SPLITS, i, () -> {
        });
    }

    private InputFormatCacheLoader createCacheLoader(int i, int i2, final Runnable runnable) throws Exception {
        DataType ROW = DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("f0", DataTypes.INT()), DataTypes.FIELD("f1", DataTypes.STRING().bridgedTo(String.class))});
        RowDataSerializer create = InternalSerializers.create(ROW.getLogicalType());
        DataFormatConverters.RowConverter rowConverter = new DataFormatConverters.RowConverter((DataType[]) ROW.getChildren().toArray(new DataType[DEFAULT_DELTA_NUM_SPLITS]));
        Stream stream = (Stream) TestCacheLoader.DATA.values().stream().map((v0) -> {
            return v0.stream();
        }).reduce(Stream.empty(), Stream::concat);
        rowConverter.getClass();
        FullCacheTestInputFormat fullCacheTestInputFormat = new FullCacheTestInputFormat((Collection) stream.map((v1) -> {
            return r1.toExternal(v1);
        }).collect(Collectors.toList()), Optional.empty(), rowConverter, i, i2);
        RowType logicalType = DataTypes.ROW(new DataType[]{DataTypes.INT()}).getLogicalType();
        InputFormatCacheLoader inputFormatCacheLoader = new InputFormatCacheLoader(fullCacheTestInputFormat, new GenericRowDataKeySelector(InternalTypeInfo.of(logicalType), InternalSerializers.create(logicalType), new GeneratedProjection("", "", new Object[DEFAULT_DELTA_NUM_SPLITS]) { // from class: org.apache.flink.table.runtime.functions.table.fullcache.inputformat.InputFormatCacheLoaderTest.1
            /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
            public Projection m7newInstance(ClassLoader classLoader) {
                Runnable runnable2 = runnable;
                return rowData -> {
                    runnable2.run();
                    return StreamRecordUtils.row(Integer.valueOf(rowData.getInt(InputFormatCacheLoaderTest.DEFAULT_DELTA_NUM_SPLITS)));
                };
            }
        }), create);
        inputFormatCacheLoader.open(new Configuration(), Thread.currentThread().getContextClassLoader());
        return inputFormatCacheLoader;
    }
}
