/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.changelog.restore;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoReader;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters;
import org.apache.flink.state.changelog.ChangelogKeyedStateBackend;
import org.apache.flink.state.changelog.ChangelogState;
import org.apache.flink.state.changelog.StateChangeOperation;
import org.apache.flink.state.changelog.restore.ChangelogApplierFactory;
import org.apache.flink.state.changelog.restore.ChangelogApplierFactoryImpl;
import org.apache.flink.state.changelog.restore.FunctionDelegationHelper;
import org.apache.flink.state.changelog.restore.StateChangeApplier;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ChangelogBackendLogApplier {
    private static final Logger LOG = LoggerFactory.getLogger(ChangelogBackendLogApplier.class);

    public static void apply(StateChange stateChange, ChangelogKeyedStateBackend<?> changelogBackend, ClassLoader classLoader) throws Exception {
        DataInputViewStreamWrapper in = new DataInputViewStreamWrapper((InputStream)new ByteArrayInputStream(stateChange.getChange()));
        ChangelogBackendLogApplier.applyOperation(StateChangeOperation.byCode(in.readByte()), stateChange.getKeyGroup(), changelogBackend, (DataInputView)in, classLoader, ChangelogApplierFactoryImpl.INSTANCE);
    }

    private static void applyOperation(StateChangeOperation operation, int keyGroup, ChangelogKeyedStateBackend<?> backend, DataInputView in, ClassLoader classLoader, ChangelogApplierFactory factory) throws Exception {
        LOG.debug("apply {} in key group {}", (Object)operation, (Object)keyGroup);
        if (operation == StateChangeOperation.METADATA) {
            ChangelogBackendLogApplier.applyMetaDataChange(in, backend, classLoader);
        } else if (backend.getKeyGroupRange().contains(keyGroup)) {
            ChangelogBackendLogApplier.applyDataChange(in, factory, backend, operation);
        }
    }

    private static void applyMetaDataChange(DataInputView in, ChangelogKeyedStateBackend<?> backend, ClassLoader classLoader) throws Exception {
        StateMetaInfoSnapshot snapshot = ChangelogBackendLogApplier.readStateMetaInfoSnapshot(in, classLoader);
        switch (snapshot.getBackendStateType()) {
            case KEY_VALUE: {
                ChangelogBackendLogApplier.restoreKvMetaData(backend, snapshot, in);
                return;
            }
            case PRIORITY_QUEUE: {
                ChangelogBackendLogApplier.restorePqMetaData(backend, snapshot);
                return;
            }
        }
        throw new RuntimeException("Unsupported state type: " + snapshot.getBackendStateType() + ", sate: " + snapshot.getName());
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private static StateTtlConfig readTtlConfig(DataInputView in) throws IOException {
        if (!in.readBoolean()) return StateTtlConfig.DISABLED;
        try (ObjectInputStream objectInputStream = new ObjectInputStream((InputStream)new DataInputViewStream(in));){
            StateTtlConfig stateTtlConfig = (StateTtlConfig)objectInputStream.readObject();
            return stateTtlConfig;
        }
        catch (ClassNotFoundException e) {
            throw new IOException(e);
        }
    }

    @Nullable
    private static Object readDefaultValue(DataInputView in, RegisteredKeyValueStateBackendMetaInfo meta) throws IOException {
        return in.readBoolean() ? meta.getStateSerializer().deserialize(in) : null;
    }

    private static void restoreKvMetaData(ChangelogKeyedStateBackend<?> backend, StateMetaInfoSnapshot snapshot, DataInputView in) throws Exception {
        RegisteredKeyValueStateBackendMetaInfo meta = new RegisteredKeyValueStateBackendMetaInfo(snapshot);
        StateTtlConfig ttlConfig = ChangelogBackendLogApplier.readTtlConfig(in);
        Object defaultValue = ChangelogBackendLogApplier.readDefaultValue(in, meta);
        StateDescriptor stateDescriptor = ChangelogBackendLogApplier.toStateDescriptor(meta, defaultValue);
        if (ttlConfig.isEnabled()) {
            stateDescriptor.enableTimeToLive(ttlConfig);
        }
        backend.getOrCreateKeyedState(meta.getNamespaceSerializer(), stateDescriptor);
    }

    private static StateDescriptor toStateDescriptor(RegisteredKeyValueStateBackendMetaInfo meta, @Nullable Object defaultValue) {
        switch (meta.getStateType()) {
            case VALUE: {
                return new ValueStateDescriptor(meta.getName(), meta.getStateSerializer(), defaultValue);
            }
            case MAP: {
                MapSerializer mapSerializer = (MapSerializer)meta.getStateSerializer();
                return new MapStateDescriptor(meta.getName(), mapSerializer.getKeySerializer(), mapSerializer.getValueSerializer());
            }
            case LIST: {
                return new ListStateDescriptor(meta.getName(), ((ListSerializer)meta.getStateSerializer()).getElementSerializer());
            }
            case AGGREGATING: {
                return new AggregatingStateDescriptor(meta.getName(), FunctionDelegationHelper.delegateAggregateFunction(), meta.getStateSerializer());
            }
            case REDUCING: {
                return new ReducingStateDescriptor(meta.getName(), FunctionDelegationHelper.delegateReduceFunction(), meta.getStateSerializer());
            }
        }
        throw new IllegalArgumentException(meta.getStateType().toString());
    }

    private static void restorePqMetaData(ChangelogKeyedStateBackend<?> backend, StateMetaInfoSnapshot snapshot) {
        RegisteredPriorityQueueStateBackendMetaInfo meta = new RegisteredPriorityQueueStateBackendMetaInfo(snapshot);
        backend.create(meta.getName(), meta.getElementSerializer());
    }

    private static StateMetaInfoSnapshot readStateMetaInfoSnapshot(DataInputView in, ClassLoader classLoader) throws IOException {
        int version = in.readInt();
        StateMetaInfoReader reader = StateMetaInfoSnapshotReadersWriters.getReader((int)version, (StateMetaInfoSnapshotReadersWriters.StateTypeHint)StateMetaInfoSnapshotReadersWriters.StateTypeHint.KEYED_STATE);
        return reader.readStateMetaInfoSnapshot(in, classLoader);
    }

    private static void applyDataChange(DataInputView in, ChangelogApplierFactory factory, ChangelogKeyedStateBackend<?> backend, StateChangeOperation operation) throws Exception {
        String name = (String)Preconditions.checkNotNull((Object)in.readUTF());
        StateMetaInfoSnapshot.BackendStateType type = StateMetaInfoSnapshot.BackendStateType.byCode((int)in.readByte());
        ChangelogState state = backend.getExistingStateForRecovery(name, type);
        StateChangeApplier changeApplier = state.getChangeApplier(factory);
        changeApplier.apply(operation, in);
    }

    private ChangelogBackendLogApplier() {
    }
}

