/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.heap.CopyOnWriteSkipListStateMap;
import org.apache.flink.runtime.state.heap.SkipListValueSerializer;
import org.apache.flink.runtime.state.heap.StateMapSnapshot;
import org.apache.flink.util.ResourceGuard;

public class CopyOnWriteSkipListStateMapSnapshot<K, N, S>
extends StateMapSnapshot<K, N, S, CopyOnWriteSkipListStateMap<K, N, S>> {
    private final int snapshotVersion;
    @Nonnegative
    private final int numberOfEntriesInSnapshotData;
    private final ResourceGuard.Lease lease;

    CopyOnWriteSkipListStateMapSnapshot(CopyOnWriteSkipListStateMap<K, N, S> owningStateMap, ResourceGuard.Lease lease) {
        super(owningStateMap);
        this.snapshotVersion = owningStateMap.getStateMapVersion();
        this.numberOfEntriesInSnapshotData = owningStateMap.size();
        this.lease = lease;
    }

    int getSnapshotVersion() {
        return this.snapshotVersion;
    }

    public void release() {
        ((CopyOnWriteSkipListStateMap)this.owningStateMap).releaseSnapshot(this);
        this.lease.close();
    }

    public void writeState(TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, TypeSerializer<S> stateSerializer, @Nonnull DataOutputView dov, @Nullable StateSnapshotTransformer<S> stateSnapshotTransformer) throws IOException {
        if (stateSnapshotTransformer == null) {
            this.writeStateWithNoTransform(dov);
        } else {
            this.writeStateWithTransform(stateSerializer, dov, stateSnapshotTransformer);
        }
    }

    private void writeStateWithNoTransform(@Nonnull DataOutputView dov) throws IOException {
        dov.writeInt(this.numberOfEntriesInSnapshotData);
        SnapshotNodeIterator nodeIterator = new SnapshotNodeIterator(true);
        while (nodeIterator.hasNext()) {
            Tuple2<Long, Long> tuple = nodeIterator.next();
            this.writeKeyAndNamespace((Long)tuple.f0, dov);
            this.writeValue((Long)tuple.f1, dov);
        }
    }

    private void writeStateWithTransform(TypeSerializer<S> stateSerializer, @Nonnull DataOutputView dov, @Nonnull StateSnapshotTransformer<S> stateSnapshotTransformer) throws IOException {
        SkipListValueSerializer<S> skipListValueSerializer = new SkipListValueSerializer<S>(stateSerializer);
        SnapshotNodeIterator transformNodeIterator = new SnapshotNodeIterator(true);
        int size = 0;
        while (transformNodeIterator.hasNext()) {
            Tuple2<Long, Long> tuple = transformNodeIterator.next();
            S oldState = ((CopyOnWriteSkipListStateMap)this.owningStateMap).helpGetState((Long)tuple.f1, skipListValueSerializer);
            Object newState = stateSnapshotTransformer.filterOrTransform(oldState);
            if (newState == null) continue;
            ++size;
        }
        dov.writeInt(size);
        SnapshotNodeIterator writeNodeIterator = new SnapshotNodeIterator(false);
        while (writeNodeIterator.hasNext()) {
            Tuple2<Long, Long> tuple = writeNodeIterator.next();
            S oldState = ((CopyOnWriteSkipListStateMap)this.owningStateMap).helpGetState((Long)tuple.f1, skipListValueSerializer);
            Object newState = stateSnapshotTransformer.filterOrTransform(oldState);
            if (newState == null) continue;
            this.writeKeyAndNamespace((Long)tuple.f0, dov);
            stateSerializer.serialize(newState, dov);
        }
    }

    private void writeKeyAndNamespace(long nodeId, DataOutputView outputView) throws IOException {
        Tuple2<byte[], byte[]> tuple = ((CopyOnWriteSkipListStateMap)this.owningStateMap).helpGetBytesForKeyAndNamespace(nodeId);
        outputView.write((byte[])tuple.f1);
        outputView.write((byte[])tuple.f0);
    }

    private void writeValue(long valuePointer, DataOutputView outputView) throws IOException {
        outputView.write(((CopyOnWriteSkipListStateMap)this.owningStateMap).helpGetBytesForState(valuePointer));
    }

    class SnapshotNodeIterator
    implements Iterator<Tuple2<Long, Long>> {
        private boolean isPrune;
        private long nextNode;
        private long nextValuePointer;

        SnapshotNodeIterator(boolean isPrune) {
            this.isPrune = isPrune;
            this.nextNode = -2L;
            this.advance();
        }

        private void advance() {
            if (this.nextNode == -1L) {
                return;
            }
            long node = ((CopyOnWriteSkipListStateMap)CopyOnWriteSkipListStateMapSnapshot.this.owningStateMap).helpGetNextNode(this.nextNode, 0);
            long valuePointer = -1L;
            while (node != -1L) {
                int valueLen;
                valuePointer = this.isPrune ? ((CopyOnWriteSkipListStateMap)CopyOnWriteSkipListStateMapSnapshot.this.owningStateMap).getAndPruneValueForSnapshot(node, CopyOnWriteSkipListStateMapSnapshot.this.snapshotVersion) : ((CopyOnWriteSkipListStateMap)CopyOnWriteSkipListStateMapSnapshot.this.owningStateMap).getValueForSnapshot(node, CopyOnWriteSkipListStateMapSnapshot.this.snapshotVersion);
                int n = valueLen = valuePointer == -1L ? 0 : ((CopyOnWriteSkipListStateMap)CopyOnWriteSkipListStateMapSnapshot.this.owningStateMap).helpGetValueLen(valuePointer);
                if (valueLen != 0) break;
                node = ((CopyOnWriteSkipListStateMap)CopyOnWriteSkipListStateMapSnapshot.this.owningStateMap).helpGetNextNode(node, 0);
            }
            this.nextNode = node;
            this.nextValuePointer = valuePointer;
        }

        @Override
        public boolean hasNext() {
            return this.nextNode != -1L;
        }

        @Override
        public Tuple2<Long, Long> next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            long node = this.nextNode;
            long valuePointer = this.nextValuePointer;
            this.advance();
            return Tuple2.of((Object)node, (Object)valuePointer);
        }
    }
}

