/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.naming.consistency.persistent.impl;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.ByteUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.consistency.cp.CPProtocol;
import com.alibaba.nacos.consistency.entity.ReadRequest;
import com.alibaba.nacos.consistency.entity.Response;
import com.alibaba.nacos.consistency.entity.WriteRequest;
import com.alibaba.nacos.core.distributed.ProtocolManager;
import com.alibaba.nacos.core.exception.ErrorCode;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.consistency.persistent.impl.BasePersistentServiceProcessor;
import com.alibaba.nacos.naming.consistency.persistent.impl.BatchReadResponse;
import com.alibaba.nacos.naming.consistency.persistent.impl.BatchWriteRequest;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.pojo.Record;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class PersistentServiceProcessor
extends BasePersistentServiceProcessor {
    private final CPProtocol protocol;
    private volatile boolean hasLeader = false;

    public PersistentServiceProcessor(ProtocolManager protocolManager, ClusterVersionJudgement versionJudgement) throws Exception {
        super(versionJudgement);
        this.protocol = protocolManager.getCpProtocol();
    }

    @Override
    public void afterConstruct() {
        super.afterConstruct();
        this.protocol.addRequestProcessors(Collections.singletonList(this));
        this.protocol.protocolMetaData().subscribe("naming_persistent_service", "leader", (o, arg) -> {
            this.hasLeader = StringUtils.isNotBlank((String)String.valueOf(arg));
        });
        if (((Boolean)EnvUtil.getProperty((String)"nacos.naming.use-new-raft.first", Boolean.class, (Object)false)).booleanValue()) {
            NotifyCenter.registerSubscriber((Subscriber)this.notifier);
            this.waitLeader();
            this.startNotify = true;
        }
    }

    private void waitLeader() {
        while (!this.hasLeader && !this.hasError) {
            Loggers.RAFT.info("Waiting Jraft leader vote ...");
            try {
                TimeUnit.MILLISECONDS.sleep(500L);
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    @Override
    public void put(String key, Record value) throws NacosException {
        BatchWriteRequest req = new BatchWriteRequest();
        Datum datum = Datum.createDatum(key, value);
        req.append(ByteUtils.toBytes((String)key), this.serializer.serialize((Object)datum));
        WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom((byte[])this.serializer.serialize((Object)req))).setGroup("naming_persistent_service").setOperation(BasePersistentServiceProcessor.Op.Write.desc).build();
        try {
            this.protocol.write(request);
        }
        catch (Exception e) {
            throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage());
        }
    }

    @Override
    public void remove(String key) throws NacosException {
        BatchWriteRequest req = new BatchWriteRequest();
        req.append(ByteUtils.toBytes((String)key), ByteUtils.EMPTY);
        WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom((byte[])this.serializer.serialize((Object)req))).setGroup("naming_persistent_service").setOperation(BasePersistentServiceProcessor.Op.Delete.desc).build();
        try {
            this.protocol.write(request);
        }
        catch (Exception e) {
            throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage());
        }
    }

    @Override
    public Datum get(String key) throws NacosException {
        ArrayList<byte[]> keys = new ArrayList<byte[]>(1);
        keys.add(ByteUtils.toBytes((String)key));
        ReadRequest req = ReadRequest.newBuilder().setGroup("naming_persistent_service").setData(ByteString.copyFrom((byte[])this.serializer.serialize(keys))).build();
        try {
            Response resp = this.protocol.getData(req);
            if (resp.getSuccess()) {
                BatchReadResponse response = (BatchReadResponse)this.serializer.deserialize(resp.getData().toByteArray(), BatchReadResponse.class);
                List<byte[]> rValues = response.getValues();
                return rValues.isEmpty() ? null : (Datum)this.serializer.deserialize(rValues.get(0), this.getDatumTypeFromKey(key));
            }
            throw new NacosException(ErrorCode.ProtoReadError.getCode(), resp.getErrMsg());
        }
        catch (Throwable e) {
            throw new NacosException(ErrorCode.ProtoReadError.getCode(), e.getMessage());
        }
    }

    @Override
    public void listen(String key, RecordListener listener) throws NacosException {
        this.notifier.registerListener(key, listener);
        if (this.startNotify) {
            this.notifierDatumIfAbsent(key, listener);
        }
    }

    @Override
    public void unListen(String key, RecordListener listener) throws NacosException {
        this.notifier.deregisterListener(key, listener);
    }

    @Override
    public boolean isAvailable() {
        return this.hasLeader && !this.hasError;
    }
}

