/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.changelog.fs;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.changelog.fs.OutputStreamWithPos;
import org.apache.flink.changelog.fs.StateChangeSet;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class StateChangeFormat
implements StateChangelogHandleStreamHandleReader.StateChangeIterator {
    private static final Logger LOG = LoggerFactory.getLogger(StateChangeFormat.class);

    Map<StateChangeSet, Long> write(OutputStreamWithPos os, Collection<StateChangeSet> changeSets) throws IOException {
        ArrayList<StateChangeSet> sorted = new ArrayList<StateChangeSet>(changeSets);
        sorted.sort(Comparator.comparing(StateChangeSet::getLogId).thenComparing(StateChangeSet::getSequenceNumber));
        DataOutputViewStreamWrapper dataOutput = new DataOutputViewStreamWrapper((OutputStream)os);
        HashMap<StateChangeSet, Long> pendingResults = new HashMap<StateChangeSet, Long>();
        for (StateChangeSet changeSet : sorted) {
            pendingResults.put(changeSet, os.getPos());
            this.writeChangeSet(dataOutput, changeSet.getChanges());
        }
        return pendingResults;
    }

    private void writeChangeSet(DataOutputViewStreamWrapper output, List<StateChange> changes) throws IOException {
        Map<Integer, List<StateChange>> byKeyGroup = changes.stream().collect(Collectors.groupingBy(StateChange::getKeyGroup));
        TreeMap<Integer, List<StateChange>> sorted = new TreeMap<Integer, List<StateChange>>(byKeyGroup);
        output.writeInt(sorted.size());
        for (Map.Entry entry : sorted.entrySet()) {
            output.writeInt(((List)entry.getValue()).size());
            output.writeInt(((Integer)entry.getKey()).intValue());
            for (StateChange stateChange : (List)entry.getValue()) {
                output.writeInt(stateChange.getChange().length);
                output.write(stateChange.getChange());
            }
        }
    }

    public CloseableIterator<StateChange> read(StreamStateHandle handle, long offset) throws IOException {
        final FSDataInputStream stream = handle.openInputStream();
        final DataInputViewStreamWrapper input = this.wrap((InputStream)stream);
        if (stream.getPos() != offset) {
            LOG.debug("seek from {} to {}", (Object)stream.getPos(), (Object)offset);
            input.skipBytesToRead((int)offset);
        }
        return new CloseableIterator<StateChange>(){
            int numUnreadGroups;
            int numLeftInGroup;
            int keyGroup;
            {
                this.numUnreadGroups = input.readInt();
                this.numLeftInGroup = this.numUnreadGroups-- == 0 ? 0 : input.readInt();
                this.keyGroup = this.numLeftInGroup == 0 ? 0 : input.readInt();
            }

            public boolean hasNext() {
                this.advance();
                return this.numLeftInGroup > 0;
            }

            private void advance() {
                if (this.numLeftInGroup == 0 && this.numUnreadGroups > 0) {
                    --this.numUnreadGroups;
                    try {
                        this.numLeftInGroup = input.readInt();
                        this.keyGroup = input.readInt();
                    }
                    catch (IOException e) {
                        ExceptionUtils.rethrow((Throwable)e);
                    }
                }
            }

            public StateChange next() {
                this.advance();
                if (this.numLeftInGroup == 0) {
                    throw new NoSuchElementException();
                }
                --this.numLeftInGroup;
                try {
                    return this.readChange();
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }

            private StateChange readChange() throws IOException {
                int size = input.readInt();
                byte[] bytes = new byte[size];
                IOUtils.readFully((InputStream)input, (byte[])bytes, (int)0, (int)size);
                return new StateChange(this.keyGroup, bytes);
            }

            public void close() throws Exception {
                LOG.trace("close {}", (Object)stream);
                stream.close();
            }
        };
    }

    private DataInputViewStreamWrapper wrap(InputStream stream) throws IOException {
        boolean compressed = (stream = new BufferedInputStream(stream)).read() == 1;
        return new DataInputViewStreamWrapper(compressed ? SnappyStreamCompressionDecorator.INSTANCE.decorateWithCompression(stream) : stream);
    }
}

