/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.changelog;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.state.changelog.ChangelogState;
import org.apache.flink.state.changelog.StateChangeLogger;
import org.apache.flink.state.changelog.StateChangeLoggingIterator;
import org.apache.flink.state.changelog.restore.ChangelogApplierFactory;
import org.apache.flink.state.changelog.restore.StateChangeApplier;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;

public class ChangelogKeyGroupedPriorityQueue<T>
implements KeyGroupedInternalPriorityQueue<T>,
ChangelogState {
    private KeyGroupedInternalPriorityQueue<T> delegatedPriorityQueue;
    private final StateChangeLogger<T, Void> logger;
    private final TypeSerializer<T> serializer;

    public ChangelogKeyGroupedPriorityQueue(KeyGroupedInternalPriorityQueue<T> delegatedPriorityQueue, StateChangeLogger<T, Void> logger, TypeSerializer<T> serializer) {
        this.delegatedPriorityQueue = (KeyGroupedInternalPriorityQueue)Preconditions.checkNotNull(delegatedPriorityQueue);
        this.logger = (StateChangeLogger)Preconditions.checkNotNull(logger);
        this.serializer = serializer;
    }

    public Set<T> getSubsetForKeyGroup(int keyGroupId) {
        return this.delegatedPriorityQueue.getSubsetForKeyGroup(keyGroupId);
    }

    @Nullable
    public T poll() {
        Object polled = this.delegatedPriorityQueue.poll();
        this.logRemoval(polled);
        return (T)polled;
    }

    @Nullable
    public T peek() {
        return (T)this.delegatedPriorityQueue.peek();
    }

    public boolean add(T toAdd) {
        boolean changed = this.delegatedPriorityQueue.add(toAdd);
        this.logAddition(Collections.singletonList(toAdd));
        return changed;
    }

    public boolean remove(T toRemove) {
        boolean removed = this.delegatedPriorityQueue.remove(toRemove);
        this.logRemoval(toRemove);
        return removed;
    }

    public boolean isEmpty() {
        return this.delegatedPriorityQueue.isEmpty();
    }

    public int size() {
        return this.delegatedPriorityQueue.size();
    }

    public void addAll(@Nullable Collection<? extends T> toAdd) {
        this.delegatedPriorityQueue.addAll(toAdd);
        this.logAddition(toAdd);
    }

    private void logAddition(Collection<? extends T> toAdd) {
        try {
            this.logger.valueElementAdded((ThrowingConsumer<DataOutputViewStreamWrapper, IOException>)((ThrowingConsumer)out -> {
                out.writeInt(toAdd.size());
                for (Object x : toAdd) {
                    this.serializer.serialize(x, (DataOutputView)out);
                }
            }), null);
        }
        catch (IOException e) {
            ExceptionUtils.rethrow((Throwable)e);
        }
    }

    @Nonnull
    public CloseableIterator<T> iterator() {
        return StateChangeLoggingIterator.create(this.delegatedPriorityQueue.iterator(), this.logger, (arg_0, arg_1) -> this.serializer.serialize(arg_0, arg_1), null);
    }

    @Override
    public StateChangeApplier getChangeApplier(ChangelogApplierFactory factory) {
        return factory.forPriorityQueue(this.delegatedPriorityQueue, this.serializer);
    }

    @Override
    public <IS> void setDelegatedState(IS state) {
        this.delegatedPriorityQueue = (KeyGroupedInternalPriorityQueue)Preconditions.checkNotNull(state);
    }

    public StateChangeLogger<T, Void> getStateChangeLogger() {
        return this.logger;
    }

    @Override
    public void resetWritingMetaFlag() {
        this.logger.resetWritingMetaFlag();
    }

    private void logRemoval(T toRemove) {
        try {
            this.logger.valueElementRemoved((ThrowingConsumer<DataOutputViewStreamWrapper, IOException>)((ThrowingConsumer)out -> this.serializer.serialize(toRemove, (DataOutputView)out)), null);
        }
        catch (IOException e) {
            ExceptionUtils.rethrow((Throwable)e);
        }
    }
}

