package org.apache.flink.test.state;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBMemoryControllerUtils;
import org.apache.flink.contrib.streaming.state.RocksDBOptions;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.rocksdb.Cache;
import org.rocksdb.WriteBufferManager;

/* loaded from: input_file:org/apache/flink/test/state/TaskManagerWideRocksDbMemorySharingITCase.class */
public class TaskManagerWideRocksDbMemorySharingITCase extends TestLogger {
    private static final int PARALLELISM = 4;
    private static final int NUMBER_OF_JOBS = 5;
    private static final int NUMBER_OF_TASKS = 20;
    private static final MemorySize SHARED_MEMORY = MemorySize.ofMebiBytes(500);
    private MiniClusterWithClientResource cluster;

    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();

    /* loaded from: input_file:org/apache/flink/test/state/TaskManagerWideRocksDbMemorySharingITCase$TestingRocksDBMemoryFactory.class */
    private static class TestingRocksDBMemoryFactory implements RocksDBMemoryControllerUtils.RocksDBMemoryFactory {
        private final SharedReference<List<Cache>> createdCaches;
        private final SharedReference<List<WriteBufferManager>> createdWriteBufferManagers;

        private TestingRocksDBMemoryFactory(SharedReference<List<Cache>> sharedReference, SharedReference<List<WriteBufferManager>> sharedReference2) {
            this.createdCaches = sharedReference;
            this.createdWriteBufferManagers = sharedReference2;
        }

        public Cache createCache(long j, double d) {
            Cache createCache = RocksDBMemoryControllerUtils.RocksDBMemoryFactory.DEFAULT.createCache(j, d);
            ((List) this.createdCaches.get()).add(createCache);
            return createCache;
        }

        public WriteBufferManager createWriteBufferManager(long j, Cache cache) {
            WriteBufferManager createWriteBufferManager = RocksDBMemoryControllerUtils.RocksDBMemoryFactory.DEFAULT.createWriteBufferManager(j, cache);
            ((List) this.createdWriteBufferManagers.get()).add(createWriteBufferManager);
            return createWriteBufferManager;
        }
    }

    @Before
    public void init() throws Exception {
        this.cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(getConfiguration()).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(NUMBER_OF_TASKS).build());
        this.cluster.before();
    }

    @After
    public void destroy() {
        this.cluster.after();
    }

    @Test
    public void testBlockCache() throws Exception {
        TestingRocksDBMemoryFactory testingRocksDBMemoryFactory = new TestingRocksDBMemoryFactory(this.sharedObjects.add(new CopyOnWriteArrayList()), this.sharedObjects.add(new CopyOnWriteArrayList()));
        ArrayList<JobID> arrayList = new ArrayList(NUMBER_OF_JOBS);
        for (int i = 0; i < NUMBER_OF_JOBS; i++) {
            try {
                arrayList.add(this.cluster.getRestClusterClient().submitJob(dag(testingRocksDBMemoryFactory)).get());
            } catch (Throwable th) {
                for (JobID jobID : arrayList) {
                    try {
                        this.cluster.getRestClusterClient().cancel(jobID).get();
                    } catch (Exception e) {
                        this.log.warn("Can not cancel job {}", jobID, e);
                    }
                }
                throw th;
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            CommonTestUtils.waitForAllTaskRunning(this.cluster.getMiniCluster(), (JobID) it.next(), false);
        }
        Assert.assertEquals(1L, r0.size());
        Assert.assertEquals(1L, r0.size());
        for (JobID jobID2 : arrayList) {
            try {
                this.cluster.getRestClusterClient().cancel(jobID2).get();
            } catch (Exception e2) {
                this.log.warn("Can not cancel job {}", jobID2, e2);
            }
        }
    }

    private JobGraph dag(RocksDBMemoryControllerUtils.RocksDBMemoryFactory rocksDBMemoryFactory) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
        executionEnvironment.setParallelism(PARALLELISM);
        EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);
        embeddedRocksDBStateBackend.setRocksDBMemoryFactory(rocksDBMemoryFactory);
        executionEnvironment.setStateBackend(embeddedRocksDBStateBackend);
        executionEnvironment.enableCheckpointing(86400000L, CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        executionEnvironment.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).keyBy(l -> {
            return l;
        }).map(new RichMapFunction<Long, Long>() { // from class: org.apache.flink.test.state.TaskManagerWideRocksDbMemorySharingITCase.1
            private ListState<byte[]> state;
            private int payloadSize;

            public void open(OpenContext openContext) throws Exception {
                super.open(openContext);
                this.state = getRuntimeContext().getListState(new ListStateDescriptor("state", byte[].class));
                this.payloadSize = TaskManagerWideRocksDbMemorySharingITCase.PARALLELISM + new Random().nextInt(7);
            }

            public Long map(Long l2) throws Exception {
                this.state.add(new byte[this.payloadSize]);
                Thread.sleep(1L);
                return l2;
            }
        }).sinkTo(new DiscardingSink());
        return executionEnvironment.getStreamGraph().getJobGraph();
    }

    private static Configuration getConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(RocksDBOptions.FIX_PER_TM_MEMORY_SIZE, SHARED_MEMORY);
        configuration.set(RocksDBOptions.USE_MANAGED_MEMORY, false);
        return configuration;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -981161537:
                if (implMethodName.equals("lambda$dag$74149642$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/state/TaskManagerWideRocksDbMemorySharingITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l -> {
                        return l;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
