package org.apache.flink.table.runtime.functions.table.fullcache.inputformat;

import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.source.lookup.cache.InterceptingCacheMetricGroup;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.runtime.functions.table.fullcache.TestCacheLoader;
import org.apache.flink.table.runtime.functions.table.lookup.fullcache.inputformat.InputFormatCacheLoader;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.keyselector.GenericRowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.NotThrownAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.class */
class InputFormatCacheLoaderTest {
    InputFormatCacheLoaderTest() {
    }

    @BeforeEach
    void resetCounter() {
        FullCacheTestInputFormat.OPEN_CLOSED_COUNTER.set(0);
    }

    @AfterEach
    void checkCounter() {
        Assertions.assertThat(FullCacheTestInputFormat.OPEN_CLOSED_COUNTER).hasValue(0);
    }

    @MethodSource({"deltaNumSplits"})
    @ParameterizedTest
    void testReadWithDifferentSplits(int i) throws Exception {
        InputFormatCacheLoader createCacheLoader = createCacheLoader(i);
        createCacheLoader.open(UnregisteredMetricsGroup.createCacheMetricGroup());
        createCacheLoader.run();
        ConcurrentHashMap cache = createCacheLoader.getCache();
        assertCacheContent(cache);
        createCacheLoader.run();
        Assertions.assertThat(createCacheLoader.getCache()).isNotSameAs(cache);
        createCacheLoader.close();
        Assertions.assertThat(createCacheLoader.getCache().size()).isZero();
    }

    @Test
    void testCacheMetrics() throws Exception {
        InputFormatCacheLoader createCacheLoader = createCacheLoader(0);
        InterceptingCacheMetricGroup interceptingCacheMetricGroup = new InterceptingCacheMetricGroup();
        createCacheLoader.open(interceptingCacheMetricGroup);
        Assertions.assertThat(interceptingCacheMetricGroup.loadCounter).isNotNull();
        Assertions.assertThat(interceptingCacheMetricGroup.loadCounter.getCount()).isEqualTo(0L);
        Assertions.assertThat(interceptingCacheMetricGroup.numLoadFailuresCounter).isNotNull();
        Assertions.assertThat(interceptingCacheMetricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0L);
        Assertions.assertThat(interceptingCacheMetricGroup.numCachedRecordsGauge).isNotNull();
        Assertions.assertThat((Long) interceptingCacheMetricGroup.numCachedRecordsGauge.getValue()).isEqualTo(0L);
        Assertions.assertThat(interceptingCacheMetricGroup.latestLoadTimeGauge).isNotNull();
        Assertions.assertThat((Long) interceptingCacheMetricGroup.latestLoadTimeGauge.getValue()).isEqualTo(-1L);
        Assertions.assertThat(interceptingCacheMetricGroup.hitCounter).isNull();
        Assertions.assertThat(interceptingCacheMetricGroup.missCounter).isNull();
        Assertions.assertThat(interceptingCacheMetricGroup.numCachedBytesGauge).isNull();
        createCacheLoader.run();
        Assertions.assertThat(interceptingCacheMetricGroup.loadCounter.getCount()).isEqualTo(1L);
        Assertions.assertThat((Long) interceptingCacheMetricGroup.latestLoadTimeGauge.getValue()).isNotEqualTo(-1L);
        Assertions.assertThat((Long) interceptingCacheMetricGroup.numCachedRecordsGauge.getValue()).isEqualTo(TestCacheLoader.DATA.size());
    }

    @Test
    void testExceptionDuringReload() throws Exception {
        RuntimeException runtimeException = new RuntimeException("Load failed.");
        InputFormatCacheLoader createCacheLoader = createCacheLoader(0, () -> {
            throw runtimeException;
        });
        InterceptingCacheMetricGroup interceptingCacheMetricGroup = new InterceptingCacheMetricGroup();
        createCacheLoader.open(interceptingCacheMetricGroup);
        createCacheLoader.getClass();
        Assertions.assertThatThrownBy(createCacheLoader::run).hasRootCause(runtimeException);
        Assertions.assertThat(interceptingCacheMetricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1L);
    }

    @Test
    void testCloseAndInterruptDuringReload() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        int size = TestCacheLoader.DATA.size() + 1;
        Runnable createCacheLoader = createCacheLoader(0, ThrowingRunnable.unchecked(() -> {
            atomicInteger.incrementAndGet();
            Thread.sleep(1000L);
        }));
        InterceptingCacheMetricGroup interceptingCacheMetricGroup = new InterceptingCacheMetricGroup();
        createCacheLoader.open(interceptingCacheMetricGroup);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        Future<?> submit = newSingleThreadExecutor.submit(createCacheLoader);
        newSingleThreadExecutor.shutdownNow();
        NotThrownAssert assertThatNoException = Assertions.assertThatNoException();
        submit.getClass();
        assertThatNoException.isThrownBy(submit::get);
        Assertions.assertThat(atomicInteger).hasValueLessThan(size);
        Assertions.assertThat(interceptingCacheMetricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0L);
        atomicInteger.set(0);
        Future<?> submit2 = Executors.newSingleThreadExecutor().submit(createCacheLoader);
        createCacheLoader.close();
        NotThrownAssert assertThatNoException2 = Assertions.assertThatNoException();
        submit2.getClass();
        assertThatNoException2.isThrownBy(submit2::get);
        Assertions.assertThat(atomicInteger).hasValueLessThan(size);
        Assertions.assertThat(interceptingCacheMetricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0L);
    }

    static Stream<Arguments> deltaNumSplits() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{-1}), Arguments.of(new Object[]{0}), Arguments.of(new Object[]{1})});
    }

    private void assertCacheContent(Map<RowData, Collection<RowData>> map) {
        Assertions.assertThat(map).containsOnlyKeys(TestCacheLoader.DATA.keySet());
        TestCacheLoader.DATA.forEach((rowData, collection) -> {
            Assertions.assertThat(collection).containsExactlyInAnyOrderElementsOf((Iterable) map.get(rowData));
        });
    }

    private InputFormatCacheLoader createCacheLoader(int i) throws Exception {
        return createCacheLoader(i, () -> {
        });
    }

    private InputFormatCacheLoader createCacheLoader(int i, final Runnable runnable) throws Exception {
        DataType ROW = DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("f0", DataTypes.INT()), DataTypes.FIELD("f1", DataTypes.STRING().bridgedTo(String.class))});
        RowDataSerializer create = InternalSerializers.create(ROW.getLogicalType());
        DataFormatConverters.RowConverter rowConverter = new DataFormatConverters.RowConverter((DataType[]) ROW.getChildren().toArray(new DataType[0]));
        Stream stream = (Stream) TestCacheLoader.DATA.values().stream().map((v0) -> {
            return v0.stream();
        }).reduce(Stream.empty(), Stream::concat);
        rowConverter.getClass();
        FullCacheTestInputFormat fullCacheTestInputFormat = new FullCacheTestInputFormat((Collection) stream.map((v1) -> {
            return r1.toExternal(v1);
        }).collect(Collectors.toList()), Optional.empty(), rowConverter, i);
        RowType logicalType = DataTypes.ROW(new DataType[]{DataTypes.INT()}).getLogicalType();
        InputFormatCacheLoader inputFormatCacheLoader = new InputFormatCacheLoader(fullCacheTestInputFormat, new GenericRowDataKeySelector(InternalTypeInfo.of(logicalType), InternalSerializers.create(logicalType), new GeneratedProjection("", "", new Object[0]) { // from class: org.apache.flink.table.runtime.functions.table.fullcache.inputformat.InputFormatCacheLoaderTest.1
            /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
            public Projection m1047newInstance(ClassLoader classLoader) {
                Runnable runnable2 = runnable;
                return rowData -> {
                    runnable2.run();
                    return StreamRecordUtils.row(Integer.valueOf(rowData.getInt(0)));
                };
            }
        }), create);
        inputFormatCacheLoader.open(new Configuration());
        return inputFormatCacheLoader;
    }
}
