package org.apache.flink.autoscaler.state;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.metrics.ScalingHistoryUtils;
import org.apache.flink.autoscaler.tuning.ConfigChanges;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.class */
public abstract class AbstractAutoScalerStateStoreTest<KEY, Context extends JobAutoScalerContext<KEY>> {
    protected AutoScalerStateStore<KEY, Context> stateStore;
    protected Context ctx;

    protected abstract AutoScalerStateStore<KEY, Context> createPhysicalAutoScalerStateStore() throws Exception;

    protected AutoScalerStateStore<KEY, Context> createCachedAutoScalerStateStore() throws Exception {
        return createPhysicalAutoScalerStateStore();
    }

    protected abstract Context createJobContext();

    protected void preSetup() throws Exception {
    }

    @BeforeEach
    void setup() throws Exception {
        preSetup();
        this.stateStore = createCachedAutoScalerStateStore();
        this.ctx = createJobContext();
    }

    @Test
    void testTopologyUpdate() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobVertexID jobVertexID3 = new JobVertexID();
        HashMap hashMap = new HashMap();
        hashMap.put(jobVertexID, new ScalingSummary(1, 2, (Map) null));
        hashMap.put(jobVertexID2, new ScalingSummary(1, 2, (Map) null));
        Instant now = Instant.now();
        ScalingHistoryUtils.addToScalingHistoryAndStore(this.stateStore, this.ctx, now, hashMap);
        this.stateStore.flush(this.ctx);
        Assertions.assertEquals(hashMap.keySet(), ScalingHistoryUtils.getTrimmedScalingHistory(this.stateStore, this.ctx, now).keySet());
        Assertions.assertEquals(hashMap.keySet(), ScalingHistoryUtils.getTrimmedScalingHistory(createCachedAutoScalerStateStore(), this.ctx, now).keySet());
        Assertions.assertEquals(hashMap.keySet(), ScalingHistoryUtils.getTrimmedScalingHistory(createPhysicalAutoScalerStateStore(), this.ctx, now).keySet());
        ScalingHistoryUtils.updateVertexList(this.stateStore, this.ctx, now, Set.of(jobVertexID2, jobVertexID3));
        this.stateStore.flush(this.ctx);
        Assertions.assertEquals(Set.of(jobVertexID2), ScalingHistoryUtils.getTrimmedScalingHistory(this.stateStore, this.ctx, now).keySet());
        Assertions.assertEquals(Set.of(jobVertexID2), ScalingHistoryUtils.getTrimmedScalingHistory(createCachedAutoScalerStateStore(), this.ctx, now).keySet());
        Assertions.assertEquals(Set.of(jobVertexID2), ScalingHistoryUtils.getTrimmedScalingHistory(createPhysicalAutoScalerStateStore(), this.ctx, now).keySet());
    }

    @Test
    void testHistorySizeConfigs() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        HashMap hashMap = new HashMap();
        hashMap.put(jobVertexID, new ScalingSummary(1, 2, (Map) null));
        Configuration configuration = this.ctx.getConfiguration();
        configuration.set(AutoScalerOptions.VERTEX_SCALING_HISTORY_COUNT, 2);
        configuration.set(AutoScalerOptions.VERTEX_SCALING_HISTORY_AGE, Duration.ofSeconds(10L));
        Instant now = Instant.now();
        ScalingHistoryUtils.addToScalingHistoryAndStore(this.stateStore, this.ctx, now, hashMap);
        Assertions.assertEquals(1, ((SortedMap) ScalingHistoryUtils.getTrimmedScalingHistory(this.stateStore, this.ctx, now).get(jobVertexID)).size());
        ScalingHistoryUtils.addToScalingHistoryAndStore(this.stateStore, this.ctx, now.plus((TemporalAmount) Duration.ofSeconds(1L)), hashMap);
        ScalingHistoryUtils.addToScalingHistoryAndStore(this.stateStore, this.ctx, now.plus((TemporalAmount) Duration.ofSeconds(2L)), hashMap);
        Assertions.assertEquals(2, ((SortedMap) ScalingHistoryUtils.getTrimmedScalingHistory(this.stateStore, this.ctx, now.plus((TemporalAmount) Duration.ofSeconds(2L))).get(jobVertexID)).size());
        Assertions.assertEquals(Set.of(now.plus((TemporalAmount) Duration.ofSeconds(1L)), now.plus((TemporalAmount) Duration.ofSeconds(2L))), ((SortedMap) ScalingHistoryUtils.getTrimmedScalingHistory(this.stateStore, this.ctx, now.plus((TemporalAmount) Duration.ofSeconds(2L))).get(jobVertexID)).keySet());
        ScalingHistoryUtils.addToScalingHistoryAndStore(this.stateStore, this.ctx, now.plus((TemporalAmount) Duration.ofSeconds(15L)), hashMap);
        this.stateStore.flush(this.ctx);
        Assertions.assertEquals(1, ((SortedMap) ScalingHistoryUtils.getTrimmedScalingHistory(this.stateStore, this.ctx, now.plus((TemporalAmount) Duration.ofSeconds(15L))).get(jobVertexID)).size());
        Assertions.assertEquals(Set.of(now.plus((TemporalAmount) Duration.ofSeconds(15L))), ((SortedMap) ScalingHistoryUtils.getTrimmedScalingHistory(this.stateStore, this.ctx, now.plus((TemporalAmount) Duration.ofSeconds(15L))).get(jobVertexID)).keySet());
        Assertions.assertEquals(Set.of(now.plus((TemporalAmount) Duration.ofSeconds(15L))), ((SortedMap) ScalingHistoryUtils.getTrimmedScalingHistory(createCachedAutoScalerStateStore(), this.ctx, now.plus((TemporalAmount) Duration.ofSeconds(15L))).get(jobVertexID)).keySet());
        Assertions.assertEquals(Set.of(now.plus((TemporalAmount) Duration.ofSeconds(15L))), ((SortedMap) ScalingHistoryUtils.getTrimmedScalingHistory(createPhysicalAutoScalerStateStore(), this.ctx, now.plus((TemporalAmount) Duration.ofSeconds(15L))).get(jobVertexID)).keySet());
    }

    @Test
    protected void testDiscardAllState() throws Exception {
        this.stateStore.storeCollectedMetrics(this.ctx, new TreeMap(Map.of(Instant.now(), new CollectedMetrics())));
        this.stateStore.storeScalingHistory(this.ctx, Map.of(new JobVertexID(), new TreeMap(Map.of(Instant.now(), new ScalingSummary()))));
        this.stateStore.storeParallelismOverrides(this.ctx, Map.of(new JobVertexID().toHexString(), "23"));
        this.stateStore.storeConfigChanges(this.ctx, new ConfigChanges().addOverride("config.value", "value"));
        org.assertj.core.api.Assertions.assertThat(this.stateStore.getCollectedMetrics(this.ctx)).isNotEmpty();
        org.assertj.core.api.Assertions.assertThat(this.stateStore.getScalingHistory(this.ctx)).isNotEmpty();
        org.assertj.core.api.Assertions.assertThat(this.stateStore.getParallelismOverrides(this.ctx)).isNotEmpty();
        org.assertj.core.api.Assertions.assertThat(this.stateStore.getConfigChanges(this.ctx).getOverrides()).isNotEmpty();
        this.stateStore.flush(this.ctx);
        org.assertj.core.api.Assertions.assertThat(this.stateStore.getCollectedMetrics(this.ctx)).isNotEmpty();
        org.assertj.core.api.Assertions.assertThat(this.stateStore.getScalingHistory(this.ctx)).isNotEmpty();
        org.assertj.core.api.Assertions.assertThat(this.stateStore.getParallelismOverrides(this.ctx)).isNotEmpty();
        org.assertj.core.api.Assertions.assertThat(this.stateStore.getConfigChanges(this.ctx).getOverrides()).isNotEmpty();
        this.stateStore.clearAll(this.ctx);
        org.assertj.core.api.Assertions.assertThat(this.stateStore.getCollectedMetrics(this.ctx)).isEmpty();
        org.assertj.core.api.Assertions.assertThat(this.stateStore.getScalingHistory(this.ctx)).isEmpty();
        org.assertj.core.api.Assertions.assertThat(this.stateStore.getParallelismOverrides(this.ctx)).isEmpty();
        org.assertj.core.api.Assertions.assertThat(this.stateStore.getConfigChanges(this.ctx).getOverrides()).isEmpty();
    }
}
