/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.filesystem;

import java.io.DataOutputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.AbstractHeapState;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;

public abstract class AbstractFsState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
extends AbstractHeapState<K, N, SV, S, SD, FsStateBackend> {
    private final FsStateBackend backend;

    public AbstractFsState(FsStateBackend backend, TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, TypeSerializer<SV> stateSerializer, SD stateDesc) {
        super(keySerializer, namespaceSerializer, stateSerializer, stateDesc);
        this.backend = backend;
    }

    public AbstractFsState(FsStateBackend backend, TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, TypeSerializer<SV> stateSerializer, SD stateDesc, HashMap<N, Map<K, SV>> state) {
        super(keySerializer, namespaceSerializer, stateSerializer, stateDesc, state);
        this.backend = backend;
    }

    public abstract KvStateSnapshot<K, N, S, SD, FsStateBackend> createHeapSnapshot(Path var1);

    @Override
    public KvStateSnapshot<K, N, S, SD, FsStateBackend> snapshot(long checkpointId, long timestamp) throws Exception {
        try (FsStateBackend.FsCheckpointStateOutputStream out = this.backend.createCheckpointStateOutputStream(checkpointId, timestamp);){
            DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper((OutputStream)new DataOutputStream(out));
            outView.writeInt(this.state.size());
            for (Map.Entry namespaceState : this.state.entrySet()) {
                Object namespace = namespaceState.getKey();
                this.namespaceSerializer.serialize(namespace, (DataOutputView)outView);
                outView.writeInt(((Map)namespaceState.getValue()).size());
                for (Map.Entry entry : ((Map)namespaceState.getValue()).entrySet()) {
                    this.keySerializer.serialize(entry.getKey(), (DataOutputView)outView);
                    this.stateSerializer.serialize(entry.getValue(), (DataOutputView)outView);
                }
            }
            outView.flush();
            KvStateSnapshot<K, N, S, SD, FsStateBackend> kvStateSnapshot = this.createHeapSnapshot(out.closeAndGetPath());
            return kvStateSnapshot;
        }
    }
}

