/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.changelog;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.Collection;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.state.changelog.AbstractStateChangeLogger;
import org.apache.flink.state.changelog.KvStateChangeLogger;
import org.apache.flink.state.changelog.StateChangeOperation;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;

@NotThreadSafe
class KvStateChangeLoggerImpl<Key, Value, Ns>
extends AbstractStateChangeLogger<Key, Value, Ns>
implements KvStateChangeLogger<Value, Ns> {
    private final TypeSerializer<Ns> namespaceSerializer;
    protected final TypeSerializer<Key> keySerializer;
    private final TypeSerializer<Value> valueSerializer;
    private final StateTtlConfig ttlConfig;
    @Nullable
    private final Value defaultValue;

    KvStateChangeLoggerImpl(TypeSerializer<Key> keySerializer, TypeSerializer<Ns> namespaceSerializer, TypeSerializer<Value> valueSerializer, InternalKeyContext<Key> keyContext, StateChangelogWriter<?> stateChangelogWriter, RegisteredStateMetaInfoBase metaInfo, StateTtlConfig ttlConfig, @Nullable Value defaultValue) {
        super(stateChangelogWriter, keyContext, metaInfo);
        this.keySerializer = (TypeSerializer)Preconditions.checkNotNull(keySerializer);
        this.valueSerializer = (TypeSerializer)Preconditions.checkNotNull(valueSerializer);
        this.namespaceSerializer = (TypeSerializer)Preconditions.checkNotNull(namespaceSerializer);
        this.ttlConfig = (StateTtlConfig)Preconditions.checkNotNull((Object)ttlConfig);
        this.defaultValue = defaultValue;
    }

    @Override
    public void namespacesMerged(Ns target, Collection<Ns> sources) throws IOException {
        this.log(StateChangeOperation.MERGE_NS, (ThrowingConsumer<DataOutputViewStreamWrapper, IOException>)((ThrowingConsumer)out -> {
            this.namespaceSerializer.serialize(target, (DataOutputView)out);
            out.writeInt(sources.size());
            for (Object ns : sources) {
                this.namespaceSerializer.serialize(ns, (DataOutputView)out);
            }
        }), target);
    }

    @Override
    protected void serializeValue(Value value, DataOutputViewStreamWrapper out) throws IOException {
        this.valueSerializer.serialize(value, (DataOutputView)out);
    }

    @Override
    protected void serializeScope(Ns ns, DataOutputViewStreamWrapper out) throws IOException {
        this.keySerializer.serialize(this.keyContext.getCurrentKey(), (DataOutputView)out);
        this.namespaceSerializer.serialize(ns, (DataOutputView)out);
    }

    @Override
    protected void writeDefaultValueAndTtl(DataOutputViewStreamWrapper out) throws IOException {
        out.writeBoolean(this.ttlConfig.isEnabled());
        if (this.ttlConfig.isEnabled()) {
            try (ObjectOutputStream o = new ObjectOutputStream((OutputStream)out);){
                o.writeObject(this.ttlConfig);
            }
        }
        out.writeBoolean(this.defaultValue != null);
        if (this.defaultValue != null) {
            this.serializeValue(this.defaultValue, out);
        }
    }
}

