/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.changelog;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.Collection;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.state.changelog.AbstractStateChangeLogger;
import org.apache.flink.state.changelog.KvStateChangeLogger;
import org.apache.flink.state.changelog.StateChangeOperation;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;

@NotThreadSafe
class KvStateChangeLoggerImpl<Key, Value, Ns>
extends AbstractStateChangeLogger<Key, Value, Ns>
implements KvStateChangeLogger<Value, Ns> {
    private TypeSerializer<Ns> namespaceSerializer;
    protected final TypeSerializer<Key> keySerializer;
    private TypeSerializer<Value> valueSerializer;
    private StateTtlConfig ttlConfig;
    @Nullable
    private Value defaultValue;

    KvStateChangeLoggerImpl(TypeSerializer<Key> keySerializer, TypeSerializer<Ns> namespaceSerializer, TypeSerializer<Value> valueSerializer, InternalKeyContext<Key> keyContext, StateChangelogWriter<?> stateChangelogWriter, RegisteredStateMetaInfoBase metaInfo, StateTtlConfig ttlConfig, @Nullable Value defaultValue, short stateId) {
        super(stateChangelogWriter, keyContext, metaInfo, stateId);
        this.keySerializer = (TypeSerializer)Preconditions.checkNotNull(keySerializer);
        this.valueSerializer = (TypeSerializer)Preconditions.checkNotNull(valueSerializer);
        this.namespaceSerializer = (TypeSerializer)Preconditions.checkNotNull(namespaceSerializer);
        this.ttlConfig = (StateTtlConfig)Preconditions.checkNotNull((Object)ttlConfig);
        this.defaultValue = defaultValue;
    }

    @Override
    public void namespacesMerged(Ns target, Collection<Ns> sources) throws IOException {
        this.log(StateChangeOperation.MERGE_NS, (ThrowingConsumer<DataOutputView, IOException>)((ThrowingConsumer)out -> {
            this.namespaceSerializer.serialize(target, out);
            out.writeInt(sources.size());
            for (Object ns : sources) {
                this.namespaceSerializer.serialize(ns, out);
            }
        }), target);
    }

    @Override
    protected void serializeValue(Value value, DataOutputView out) throws IOException {
        this.valueSerializer.serialize(value, out);
    }

    @Override
    protected void serializeScope(Ns ns, DataOutputView out) throws IOException {
        this.keySerializer.serialize(this.keyContext.getCurrentKey(), out);
        this.namespaceSerializer.serialize(ns, out);
    }

    @Override
    protected void writeDefaultValueAndTtl(DataOutputView out) throws IOException {
        out.writeBoolean(this.ttlConfig.isEnabled());
        if (this.ttlConfig.isEnabled()) {
            try (ByteArrayOutputStreamWithPos outputStreamWithPos = new ByteArrayOutputStreamWithPos();
                 ObjectOutputStream objectOutputStream = new ObjectOutputStream((OutputStream)outputStreamWithPos);){
                objectOutputStream.writeObject(this.ttlConfig);
                out.write(outputStreamWithPos.toByteArray());
            }
        }
        out.writeBoolean(this.defaultValue != null);
        if (this.defaultValue != null) {
            this.serializeValue(this.defaultValue, out);
        }
    }

    @Override
    protected KvStateChangeLoggerImpl<Key, Value, Ns> setMetaInfo(RegisteredStateMetaInfoBase metaInfo) {
        super.setMetaInfo(metaInfo);
        RegisteredKeyValueStateBackendMetaInfo kvMetaInfo = (RegisteredKeyValueStateBackendMetaInfo)metaInfo;
        this.namespaceSerializer = kvMetaInfo.getNamespaceSerializer();
        this.valueSerializer = kvMetaInfo.getStateSerializer();
        return this;
    }

    KvStateChangeLoggerImpl<Key, Value, Ns> setStateTtlConfig(StateTtlConfig ttlConfig) {
        this.ttlConfig = ttlConfig;
        return this;
    }

    KvStateChangeLoggerImpl<Key, Value, Ns> setDefaultValue(Value defaultValue) {
        this.defaultValue = defaultValue;
        return this;
    }
}

