package org.apache.flink.autoscaler.jdbc.state;

import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.ScalingTracking;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.tuning.ConfigChanges;
import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JacksonException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
/* loaded from: input_file:org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.class */
public class JdbcAutoScalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>> implements AutoScalerStateStore<KEY, Context> {
    private final JdbcStateStore jdbcStateStore;
    private static final Logger LOG = LoggerFactory.getLogger(JdbcAutoScalerStateStore.class);
    protected static final ObjectMapper YAML_MAPPER = new ObjectMapper().registerModule(new JavaTimeModule()).registerModule(new AutoScalerSerDeModule());

    public JdbcAutoScalerStateStore(JdbcStateStore jdbcStateStore) {
        this.jdbcStateStore = jdbcStateStore;
    }

    public void storeScalingHistory(Context context, Map<JobVertexID, SortedMap<Instant, ScalingSummary>> map) throws Exception {
        this.jdbcStateStore.putSerializedState(getSerializeKey((JdbcAutoScalerStateStore<KEY, Context>) context), StateType.SCALING_HISTORY, serializeScalingHistory(map));
    }

    @Nonnull
    public Map<JobVertexID, SortedMap<Instant, ScalingSummary>> getScalingHistory(Context context) {
        Optional<String> serializedState = this.jdbcStateStore.getSerializedState(getSerializeKey((JdbcAutoScalerStateStore<KEY, Context>) context), StateType.SCALING_HISTORY);
        if (serializedState.isEmpty()) {
            return new HashMap();
        }
        try {
            return deserializeScalingHistory(serializedState.get());
        } catch (JacksonException e) {
            LOG.error("Could not deserialize scaling history, possibly the format changed. Discarding...", e);
            this.jdbcStateStore.removeSerializedState(getSerializeKey((JdbcAutoScalerStateStore<KEY, Context>) context), StateType.SCALING_HISTORY);
            return new HashMap();
        }
    }

    public void removeScalingHistory(Context context) {
        this.jdbcStateStore.removeSerializedState(getSerializeKey((JdbcAutoScalerStateStore<KEY, Context>) context), StateType.SCALING_HISTORY);
    }

    public void storeScalingTracking(Context context, ScalingTracking scalingTracking) throws Exception {
        this.jdbcStateStore.putSerializedState(getSerializeKey((JdbcAutoScalerStateStore<KEY, Context>) context), StateType.SCALING_TRACKING, serializeScalingTracking(scalingTracking));
    }

    public ScalingTracking getScalingTracking(Context context) {
        Optional<String> serializedState = this.jdbcStateStore.getSerializedState(getSerializeKey((JdbcAutoScalerStateStore<KEY, Context>) context), StateType.SCALING_TRACKING);
        if (serializedState.isEmpty()) {
            return new ScalingTracking();
        }
        try {
            return deserializeScalingTracking(serializedState.get());
        } catch (JacksonException e) {
            LOG.error("Could not deserialize rescaling history, possibly the format changed. Discarding...", e);
            this.jdbcStateStore.removeSerializedState(getSerializeKey((JdbcAutoScalerStateStore<KEY, Context>) context), StateType.SCALING_TRACKING);
            return new ScalingTracking();
        }
    }

    public void storeCollectedMetrics(Context context, SortedMap<Instant, CollectedMetrics> sortedMap) throws Exception {
        this.jdbcStateStore.putSerializedState(getSerializeKey((JdbcAutoScalerStateStore<KEY, Context>) context), StateType.COLLECTED_METRICS, serializeEvaluatedMetrics(sortedMap));
    }

    @Nonnull
    public SortedMap<Instant, CollectedMetrics> getCollectedMetrics(Context context) {
        Optional<String> serializedState = this.jdbcStateStore.getSerializedState(getSerializeKey((JdbcAutoScalerStateStore<KEY, Context>) context), StateType.COLLECTED_METRICS);
        if (serializedState.isEmpty()) {
            return new TreeMap();
        }
        try {
            return deserializeEvaluatedMetrics(serializedState.get());
        } catch (JacksonException e) {
            LOG.error("Could not deserialize metric history, possibly the format changed. Discarding...", e);
            this.jdbcStateStore.removeSerializedState(getSerializeKey((JdbcAutoScalerStateStore<KEY, Context>) context), StateType.COLLECTED_METRICS);
            return new TreeMap();
        }
    }

    public void removeCollectedMetrics(Context context) {
        this.jdbcStateStore.removeSerializedState(getSerializeKey((JdbcAutoScalerStateStore<KEY, Context>) context), StateType.COLLECTED_METRICS);
    }

    public void storeParallelismOverrides(Context context, Map<String, String> map) {
        this.jdbcStateStore.putSerializedState(getSerializeKey((JdbcAutoScalerStateStore<KEY, Context>) context), StateType.PARALLELISM_OVERRIDES, serializeParallelismOverrides(map));
    }

    @Nonnull
    public Map<String, String> getParallelismOverrides(Context context) {
        return (Map) this.jdbcStateStore.getSerializedState(getSerializeKey((JdbcAutoScalerStateStore<KEY, Context>) context), StateType.PARALLELISM_OVERRIDES).map(JdbcAutoScalerStateStore::deserializeParallelismOverrides).orElse(new HashMap());
    }

    public void removeParallelismOverrides(Context context) {
        this.jdbcStateStore.removeSerializedState(getSerializeKey((JdbcAutoScalerStateStore<KEY, Context>) context), StateType.PARALLELISM_OVERRIDES);
    }

    public void storeConfigChanges(Context context, ConfigChanges configChanges) {
        this.jdbcStateStore.putSerializedState(getSerializeKey((JdbcAutoScalerStateStore<KEY, Context>) context), StateType.CONFIG_OVERRIDES, serializeConfigOverrides(configChanges));
    }

    @Nonnull
    public ConfigChanges getConfigChanges(Context context) {
        return (ConfigChanges) this.jdbcStateStore.getSerializedState(getSerializeKey((JdbcAutoScalerStateStore<KEY, Context>) context), StateType.CONFIG_OVERRIDES).map(JdbcAutoScalerStateStore::deserializeConfigOverrides).orElse(new ConfigChanges());
    }

    public void removeConfigChanges(Context context) {
        this.jdbcStateStore.removeSerializedState(getSerializeKey((JdbcAutoScalerStateStore<KEY, Context>) context), StateType.CONFIG_OVERRIDES);
    }

    public void clearAll(Context context) {
        this.jdbcStateStore.clearAll(getSerializeKey((JdbcAutoScalerStateStore<KEY, Context>) context));
    }

    public void flush(Context context) throws Exception {
        this.jdbcStateStore.flush(getSerializeKey((JdbcAutoScalerStateStore<KEY, Context>) context));
    }

    public void removeInfoFromCache(KEY key) {
        this.jdbcStateStore.removeInfoFromCache(getSerializeKey((JdbcAutoScalerStateStore<KEY, Context>) key));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private String getSerializeKey(Context context) {
        return getSerializeKey((JdbcAutoScalerStateStore<KEY, Context>) context.getJobKey());
    }

    private String getSerializeKey(KEY key) {
        return key.toString();
    }

    protected static String serializeScalingHistory(Map<JobVertexID, SortedMap<Instant, ScalingSummary>> map) throws Exception {
        return YAML_MAPPER.writeValueAsString(map);
    }

    private static Map<JobVertexID, SortedMap<Instant, ScalingSummary>> deserializeScalingHistory(String str) throws JacksonException {
        return (Map) YAML_MAPPER.readValue(str, new TypeReference<Map<JobVertexID, SortedMap<Instant, ScalingSummary>>>() { // from class: org.apache.flink.autoscaler.jdbc.state.JdbcAutoScalerStateStore.1
        });
    }

    protected static String serializeScalingTracking(ScalingTracking scalingTracking) throws Exception {
        return YAML_MAPPER.writeValueAsString(scalingTracking);
    }

    private static ScalingTracking deserializeScalingTracking(String str) throws JacksonException {
        return (ScalingTracking) YAML_MAPPER.readValue(str, new TypeReference<ScalingTracking>() { // from class: org.apache.flink.autoscaler.jdbc.state.JdbcAutoScalerStateStore.2
        });
    }

    @VisibleForTesting
    protected static String serializeEvaluatedMetrics(SortedMap<Instant, CollectedMetrics> sortedMap) throws Exception {
        return YAML_MAPPER.writeValueAsString(sortedMap);
    }

    private static SortedMap<Instant, CollectedMetrics> deserializeEvaluatedMetrics(String str) throws JacksonException {
        return (SortedMap) YAML_MAPPER.readValue(str, new TypeReference<SortedMap<Instant, CollectedMetrics>>() { // from class: org.apache.flink.autoscaler.jdbc.state.JdbcAutoScalerStateStore.3
        });
    }

    private static String serializeParallelismOverrides(Map<String, String> map) {
        return (String) ConfigurationUtils.convertValue(map, String.class);
    }

    private static Map<String, String> deserializeParallelismOverrides(String str) {
        return (Map) ConfigurationUtils.convertValue(str, Map.class);
    }

    @Nullable
    private static String serializeConfigOverrides(ConfigChanges configChanges) {
        try {
            return YAML_MAPPER.writeValueAsString(configChanges);
        } catch (Exception e) {
            LOG.error("Failed to serialize ConfigOverrides", e);
            return null;
        }
    }

    @Nullable
    private static ConfigChanges deserializeConfigOverrides(String str) {
        try {
            return (ConfigChanges) YAML_MAPPER.readValue(str, new TypeReference<ConfigChanges>() { // from class: org.apache.flink.autoscaler.jdbc.state.JdbcAutoScalerStateStore.4
            });
        } catch (Exception e) {
            LOG.error("Failed to deserialize ConfigOverrides", e);
            return null;
        }
    }
}
