package io.dingodb.server.client.connector.impl;

import io.dingodb.common.Location;
import io.dingodb.common.concurrent.Executors;
import io.dingodb.common.error.CommonError;
import io.dingodb.common.util.ByteArrayUtils;
import io.dingodb.common.util.NoBreakFunctions;
import io.dingodb.net.Channel;
import io.dingodb.net.Message;
import io.dingodb.net.NetService;
import io.dingodb.net.NetServiceProvider;
import io.dingodb.server.api.CoordinatorServerApi;
import io.dingodb.server.client.config.ClientConfiguration;
import io.dingodb.server.client.connector.Connector;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/dingodb/server/client/connector/impl/CoordinatorConnector.class */
public class CoordinatorConnector implements Connector, Supplier<Location> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) CoordinatorConnector.class);
    private static final CoordinatorConnector DEFAULT_CONNECTOR;
    private final NetService netService = ((NetServiceProvider) ServiceLoader.load(NetServiceProvider.class).iterator().next()).get();
    private final AtomicReference<Channel> leaderChannel = new AtomicReference<>();
    private final AtomicReference<Location> leaderAddress = new AtomicReference<>();
    private final Set<Location> coordinatorAddresses = new HashSet();
    private final Map<Location, Channel> listenLeaderChannels = new ConcurrentHashMap();
    private long lastUpdateLeaderTime;
    private long lastUpdateNotLeaderChannelsTime;

    public static CoordinatorConnector defaultConnector() {
        return DEFAULT_CONNECTOR;
    }

    public CoordinatorConnector(List<Location> list) {
        this.coordinatorAddresses.addAll(list);
        refresh();
    }

    @Override // io.dingodb.server.client.connector.Connector
    public Channel newChannel() {
        get();
        return this.netService.newChannel(this.leaderChannel.get().remoteLocation());
    }

    @Override // io.dingodb.server.client.connector.Connector
    public boolean verify() {
        return this.leaderChannel.get() != null && this.leaderChannel.get().status() == Channel.Status.ACTIVE;
    }

    @Override // io.dingodb.server.client.connector.Connector
    public void refresh() {
        Executors.submit("coordinator-connector-refresh", this::initChannels);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.function.Supplier
    public Location get() {
        int i = 10;
        int i2 = 200;
        while (!verify()) {
            int i3 = i;
            i--;
            if (i3 <= 0) {
                break;
            }
            try {
                Thread.sleep(i2);
                refresh();
                i2 *= 10 - i;
            } catch (InterruptedException e) {
                log.error("Wait coordinator connector ready, but interrupted.");
            }
        }
        if (!verify()) {
            CommonError.EXEC_TIMEOUT.throwFormatError("wait connector available", Thread.currentThread().getName(), "");
        }
        return this.leaderChannel.get().remoteLocation();
    }

    private void initChannels() {
        for (Location location : this.coordinatorAddresses) {
            if (verify()) {
                return;
            }
            try {
                connectedLeader(this.netService.newChannel(((CoordinatorServerApi) this.netService.apiRegistry().proxy(CoordinatorServerApi.class, () -> {
                    return location;
                })).leader()));
                return;
            } catch (Exception e) {
                log.error("Open coordinator channel error, address: {}", location, e);
            }
        }
    }

    private void connected(Message message, Channel channel) {
        log.info("Connected coordinator [{}] channel.", channel.remoteLocation());
        this.coordinatorAddresses.add(channel.remoteLocation());
        channel.setCloseListener(this::listenClose);
        channel.setMessageListener(this::listenLeader);
    }

    private void connectedLeader(Channel channel) {
        try {
            if (!leaderChange(channel)) {
                channel.close();
                return;
            }
            this.lastUpdateLeaderTime = System.currentTimeMillis();
            channel.getClass();
            this.coordinatorAddresses.addAll((Collection) ((CoordinatorServerApi) this.netService.apiRegistry().proxy(CoordinatorServerApi.class, channel::remoteLocation)).getAll().stream().map(location -> {
                return new Location(location.getHost(), location.getPort());
            }).collect(Collectors.toList()));
            this.coordinatorAddresses.stream().filter(location2 -> {
                return !location2.equals(channel.remoteLocation());
            }).forEach(location3 -> {
                Executors.submit("CoordinatorConnector", () -> {
                    return this.listenLeaderChannels.computeIfAbsent(location3, NoBreakFunctions.wrap(this::connectFollow, (Consumer<Throwable>) th -> {
                        log.error("Open follow channel error, address: {}", location3, th);
                    }));
                });
            });
            this.lastUpdateNotLeaderChannelsTime = System.currentTimeMillis();
            log.info("Connected coordinator leader success, remote: [{}]", channel.remoteLocation());
        } catch (Exception e) {
            log.error("Connected coordinator leader error, address: {}", channel, e);
        }
    }

    @Nonnull
    private Channel connectFollow(Location location) {
        Channel newChannel = this.netService.newChannel(location);
        newChannel.setMessageListener(this::connected);
        newChannel.send(new Message("RAFT_SERVICE", ByteArrayUtils.EMPTY_BYTES));
        log.info("Open coordinator channel, address: [{}]", location);
        return newChannel;
    }

    private void closeChannel(Channel channel) {
        try {
            channel.close();
        } catch (Exception e) {
            log.error("Close coordinator channel error, address: [{}].", channel.remoteLocation(), e);
        }
    }

    private void listenLeader(Message message, Channel channel) {
        log.info("Receive leader message from [{}]", channel.remoteLocation().getUrl());
        leaderChange(channel);
    }

    private synchronized boolean leaderChange(Channel channel) {
        Channel channel2 = this.leaderChannel.get();
        if (channel2 != null && channel2.isActive() && channel.remoteLocation().equals(channel2.remoteLocation())) {
            log.info("Coordinator leader not changed, remote: {}", channel.remoteLocation());
            return false;
        }
        if (!this.leaderChannel.compareAndSet(channel2, channel) && !this.leaderChannel.compareAndSet(null, channel)) {
            return false;
        }
        this.lastUpdateLeaderTime = System.currentTimeMillis();
        log.info("Coordinator leader channel changed, new leader remote: [{}], old leader remote: [{}]", channel.remoteLocation(), channel2 == null ? null : channel2.remoteLocation());
        if (channel2 != null) {
            closeChannel(channel2);
        }
        channel.setCloseListener(this::listenClose);
        return true;
    }

    private void listenClose(Channel channel) {
        this.listenLeaderChannels.remove(channel.remoteLocation());
        log.info("Coordinator channel closed, remote: [{}]", channel.remoteLocation());
        if (this.leaderChannel.compareAndSet(channel, null)) {
            this.lastUpdateLeaderTime = System.currentTimeMillis();
            log.info("Coordinator leader channel closed, remote: [{}].", channel.remoteLocation());
        }
    }

    static {
        if (ClientConfiguration.instance() == null) {
            DEFAULT_CONNECTOR = null;
        } else {
            DEFAULT_CONNECTOR = new CoordinatorConnector((List) Arrays.asList(ClientConfiguration.coordinatorExchangeSvrList().split(",")).stream().map(str -> {
                return str.split(":");
            }).map(strArr -> {
                return new Location(strArr[0], Integer.parseInt(strArr[1]));
            }).collect(Collectors.toList()));
        }
    }
}
