package io.dingodb.store.proxy.meta;

import io.dingodb.cluster.ClusterService;
import io.dingodb.common.CommonId;
import io.dingodb.common.Location;
import io.dingodb.common.concurrent.Executors;
import io.dingodb.common.concurrent.LinkedRunner;
import io.dingodb.common.config.DingoConfiguration;
import io.dingodb.common.log.LogUtils;
import io.dingodb.common.util.Optional;
import io.dingodb.net.Channel;
import io.dingodb.net.Message;
import io.dingodb.net.NetService;
import io.dingodb.net.api.ApiRegistry;
import io.dingodb.net.service.ListenService;
import io.dingodb.sdk.service.LockService;
import io.dingodb.sdk.service.Services;
import io.dingodb.sdk.service.VersionService;
import io.dingodb.sdk.service.entity.common.KeyValue;
import io.dingodb.sdk.service.entity.meta.AddIndexOnTableRequest;
import io.dingodb.sdk.service.entity.meta.CreateIndexRequest;
import io.dingodb.sdk.service.entity.meta.CreateSchemaRequest;
import io.dingodb.sdk.service.entity.meta.CreateTablesRequest;
import io.dingodb.sdk.service.entity.meta.DropIndexOnTableRequest;
import io.dingodb.sdk.service.entity.meta.DropIndexRequest;
import io.dingodb.sdk.service.entity.meta.DropSchemaRequest;
import io.dingodb.sdk.service.entity.meta.DropTablesRequest;
import io.dingodb.sdk.service.entity.meta.UpdateIndexRequest;
import io.dingodb.sdk.service.entity.version.Kv;
import io.dingodb.sdk.service.entity.version.RangeRequest;
import io.dingodb.sdk.service.entity.version.RangeResponse;
import io.dingodb.store.proxy.Configuration;
import io.dingodb.store.proxy.service.TableLockService;
import io.dingodb.transaction.api.LockType;
import io.dingodb.transaction.api.TableLock;
import io.dingodb.tso.TsoService;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/dingodb/store/proxy/meta/MetaServiceApiImpl.class */
public class MetaServiceApiImpl implements MetaServiceApi {
    private static final long participantJoin = 0;
    private CommonId leaderId;
    private Channel leaderChannel;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MetaServiceApiImpl.class);
    private static final ApiRegistry apis = ApiRegistry.getDefault();
    private static final CommonId participantJoinCommonId = new CommonId(CommonId.CommonType.CLUSTER, 0, 0);
    private static final Consumer<Message> participantJoinListener = ListenService.getDefault().register(participantJoinCommonId, null);
    private static final CommonId ID = DingoConfiguration.serverId();
    public static final MetaServiceApiImpl INSTANCE = new MetaServiceApiImpl();
    private final io.dingodb.sdk.service.MetaService proxyService = Services.metaService(Configuration.coordinatorSet());
    private final LockService lockService = new LockService("MetaNode", Configuration.coordinators());
    private final VersionService versionService = Services.versionService(Configuration.coordinatorSet());
    private final Map<CommonId, Location> participantLocations = new ConcurrentHashMap();
    private final Map<CommonId, Channel> participantChannels = new ConcurrentHashMap();
    private final Map<String, TableLock> tableLocks = new ConcurrentHashMap();
    private boolean needLock = true;
    private final LinkedRunner lockRunner = new LinkedRunner("lock-runner");
    private LockService.Lock lock = this.lockService.newLock(ID + "#" + DingoConfiguration.location().url());

    private MetaServiceApiImpl() {
        this.lock.watchDestroy().thenRunAsync(() -> {
            this.lock = this.lockService.newLock(ID + "#" + DingoConfiguration.location().url());
            lock();
        }, Executors.executor("meta-lock"));
        apis.register((Class<Class>) MetaServiceApi.class, (Class) this);
        Executors.execute("meta-lock", this::lock, true);
    }

    public void close() {
        this.lockService.close();
    }

    private void retryLock() {
        this.needLock = true;
        this.lockRunner.forceFollow(this::lock);
    }

    private void lock() {
        if (this.needLock) {
            LogUtils.info(log, "Meta lock start...", new Object[0]);
            this.leaderId = null;
            this.leaderChannel = null;
            try {
                if (ID.type == CommonId.CommonType.SDK || !this.lock.tryLock()) {
                    Kv currentLock = this.lockService.currentLock();
                    String[] split = new String(currentLock.getKv().getValue()).split("#");
                    CommonId parse = CommonId.parse(split[0]);
                    Location parseUrl = Location.parseUrl(split[1]);
                    if (ID.equals(parse) || parseUrl.equals(DingoConfiguration.location())) {
                        LogUtils.info(log, "Old leader location equals current location, but id not equals, old id: {}, current id: {}.", ID, parse);
                        this.lockService.delete(ID.seq, new String(currentLock.getKv().getKey()));
                        retryLock();
                        return;
                    }
                    this.leaderChannel = NetService.getDefault().newChannel(parseUrl);
                    this.leaderChannel.setCloseListener(channel -> {
                        retryLock();
                    });
                    proxy(this.leaderChannel).connect(null, ID, DingoConfiguration.location());
                    scanTableLock();
                    this.lockService.watchLock(currentLock, this::retryLock);
                    this.leaderId = parse;
                    LogUtils.info(log, "Current {}, leader: {}.", ID, parse);
                    this.needLock = false;
                } else {
                    scanTableLock();
                    LogUtils.info(log, "Become leader, id {}.", ID);
                    this.lock.watchDestroy().thenRun(this::retryLock);
                }
            } catch (Exception e) {
                if (this.leaderChannel != null) {
                    this.leaderChannel.close();
                }
                LogUtils.error(log, "Meta lock error, will retry.", e);
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
                retryLock();
            }
        }
    }

    public boolean isLeader() {
        return this.lock.getLocked() > 0 && !this.lock.watchDestroy().isDone();
    }

    public CommonId leader() {
        return this.leaderId;
    }

    public Channel leaderChannel() {
        if (this.leaderChannel != null && !this.leaderChannel.isClosed()) {
            return this.leaderChannel.cloneChannel();
        }
        retryLock();
        throw new RuntimeException("Offline, please wait and retry.");
    }

    public boolean isReady() {
        return isLeader() || this.leaderId != null;
    }

    @Override // io.dingodb.store.proxy.meta.MetaServiceApi
    public synchronized void connect(Channel channel, CommonId commonId, Location location) {
        if (!isReady()) {
            throw new RuntimeException("Offline, please wait and retry.");
        }
        Optional.ofNullable(this.participantChannels.remove(commonId)).ifPresent(channel2 -> {
            channel2.setCloseListener(null);
        }).ifPresent((v0) -> {
            v0.close();
        });
        this.participantChannels.put(commonId, channel);
        this.participantLocations.put(commonId, location);
        LogUtils.info(log, "Participant {} join.", commonId);
        channel.setCloseListener(channel3 -> {
            this.participantChannels.remove(commonId);
            LogUtils.info(log, "Participant {} leave.", commonId);
        });
        participantJoinListener.accept(new Message(commonId.encode()));
    }

    @Override // io.dingodb.store.proxy.meta.MetaServiceApi
    public Location getLocation(CommonId commonId) {
        Location location = this.participantLocations.get(commonId);
        if (location == null) {
            location = ClusterService.getDefault().getLocation(commonId);
        }
        return location;
    }

    private MetaServiceApi proxy(Location location) {
        return (MetaServiceApi) apis.proxy(MetaServiceApi.class, location);
    }

    private MetaServiceApi proxy(Channel channel) {
        return (MetaServiceApi) apis.proxy(MetaServiceApi.class, channel);
    }

    /* JADX WARN: Type inference failed for: r0v6, types: [io.dingodb.sdk.service.entity.version.RangeRequest$RangeRequestBuilder] */
    private void scanTableLock() {
        byte[] bytes = "Lock".getBytes(StandardCharsets.UTF_8);
        byte[] copyOf = Arrays.copyOf(bytes, bytes.length);
        int length = copyOf.length - 1;
        int length2 = copyOf.length - 1;
        byte b = (byte) (copyOf[length2] + 1);
        copyOf[length2] = b;
        copyOf[length] = b;
        RangeRequest build = RangeRequest.builder().key(bytes).rangeEnd(copyOf).build();
        RangeResponse kvRange = this.versionService.kvRange(TsoService.getDefault().tso(), build);
        do {
            List<Kv> kvs = kvRange.getKvs();
            if (kvs != null && !kvs.isEmpty()) {
                Iterator<Kv> it = kvs.iterator();
                while (it.hasNext()) {
                    String[] split = new String(it.next().getKv().getKey(), StandardCharsets.UTF_8).split("\\|");
                    syncTableLock(TableLock.builder().tableId(CommonId.parse(split[1])).serverId(CommonId.parse(split[2])).lockTs(Long.parseLong(split[3])).currentTs(Long.parseLong(split[4])).type(LockType.TABLE).build());
                }
            }
        } while (kvRange.isMore());
    }

    /* JADX WARN: Type inference failed for: r1v20, types: [io.dingodb.sdk.service.entity.version.Kv$KvBuilder] */
    /* JADX WARN: Type inference failed for: r2v7, types: [io.dingodb.sdk.service.entity.common.KeyValue$KeyValueBuilder] */
    private void pubTableLock(TableLock tableLock) {
        String str = "Lock|" + tableLock.tableId + "|" + tableLock.serverId + "|" + tableLock.lockTs + "|" + tableLock.currentTs;
        LogUtils.info(log, "Pub table lock [{}].", str);
        this.lockService.put(tableLock.lockTs, str, null);
        LogUtils.info(log, "Pub table lock [{}] success, add lock watch.", str);
        tableLock.unlockFuture.thenRunAsync(() -> {
            this.lockService.delete(tableLock.lockTs, str);
            LogUtils.info(log, "Unlock table lock [{}] success.", str);
        });
        this.lockService.watchLock(Kv.builder().kv(KeyValue.builder().key(str.getBytes(StandardCharsets.UTF_8)).build()).build(), () -> {
            if (tableLock.unlockFuture.isDone()) {
                return;
            }
            LogUtils.warn(log, "Lose table lock {}.", str);
            tableLock.unlockFuture.completeExceptionally(new UnknownError("Lose table lock key " + str));
        });
        LogUtils.info(log, "pubTableLock end, lock:{}", tableLock);
    }

    /* JADX WARN: Type inference failed for: r1v16, types: [io.dingodb.sdk.service.entity.version.Kv$KvBuilder] */
    /* JADX WARN: Type inference failed for: r2v3, types: [io.dingodb.sdk.service.entity.common.KeyValue$KeyValueBuilder] */
    private synchronized boolean subTableLock(TableLock tableLock) {
        String str = "Lock|" + tableLock.tableId + "|" + tableLock.serverId + "|" + tableLock.lockTs + "|" + tableLock.currentTs;
        LogUtils.info(log, "Sub table lock [{}] success, add lock watch.", str);
        if (this.tableLocks.containsKey(str)) {
            LogUtils.info(log, "Table lock {} exist, skip subscribe.", str);
            return false;
        }
        this.lockService.watchLock(Kv.builder().kv(KeyValue.builder().key(str.getBytes(StandardCharsets.UTF_8)).build()).build(), () -> {
            tableLock.unlockFuture.complete(null);
        });
        this.tableLocks.put(str, tableLock);
        tableLock.unlockFuture.thenRun(() -> {
            this.tableLocks.remove(str);
            LogUtils.info(log, "Table lock {} unlock.", str);
        });
        LogUtils.info(log, "subTableLock end, lock:{}", tableLock);
        return true;
    }

    @Override // io.dingodb.store.proxy.meta.MetaServiceApi
    public void lockTable(long j, TableLock tableLock) {
        if (tableLock.serverId.equals(ID)) {
            pubTableLock(tableLock);
        }
        if (isLeader()) {
            broadcastTableLock(j, tableLock);
        } else if (tableLock.serverId.equals(ID)) {
            Channel leaderChannel = leaderChannel();
            Throwable th = null;
            try {
                try {
                    proxy(leaderChannel).syncTableLock(tableLock);
                    if (leaderChannel != null) {
                        if (0 != 0) {
                            try {
                                leaderChannel.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            leaderChannel.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        }
    }

    @Override // io.dingodb.store.proxy.meta.MetaServiceApi
    public void syncTableLock(TableLock tableLock) {
        if (tableLock.type != LockType.TABLE && tableLock.type != LockType.RANGE) {
            throw new RuntimeException("Supported only table and range.");
        }
        if (tableLock.serverId.equals(ID)) {
            LogUtils.warn(log, "Remote lock request, but server equals current server id, lock: {}.", tableLock);
            return;
        }
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
        TableLock build = TableLock.builder().serverId(tableLock.serverId).lockTs(tableLock.lockTs).currentTs(tableLock.currentTs).tableId(tableLock.tableId).type(tableLock.type).lockFuture(completableFuture).unlockFuture(completableFuture2).build();
        try {
            LogUtils.info(log, "syncTableLock lock:{}", build);
            if (subTableLock(build)) {
                LogUtils.info(log, "syncTableLock doLock:{}", build);
                TableLockService.INSTANCE.doLock(build);
                completableFuture.get(3L, TimeUnit.SECONDS);
            }
        } catch (Exception e) {
            completableFuture.cancel(true);
            completableFuture2.complete(null);
            throw e;
        }
    }

    private void broadcastTableLock(long j, TableLock tableLock) {
        if (!isReady()) {
            throw new RuntimeException("Offline, please wait and retry.");
        }
        LogUtils.info(log, "broadcastTableLock requestId:{}, lock:{}", Long.valueOf(j), tableLock);
        for (Map.Entry<CommonId, Channel> entry : this.participantChannels.entrySet()) {
            if (!tableLock.serverId.equals(entry.getKey())) {
                Channel cloneChannel = entry.getValue().cloneChannel();
                Throwable th = null;
                try {
                    try {
                        proxy(cloneChannel).syncTableLock(tableLock);
                        if (cloneChannel != null) {
                            if (0 != 0) {
                                try {
                                    cloneChannel.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                cloneChannel.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            }
        }
    }

    @Override // io.dingodb.store.proxy.meta.MetaServiceApi
    public void createTables(long j, String str, String str2, CreateTablesRequest createTablesRequest) {
        if (!isReady()) {
            throw new RuntimeException("Offline, please wait and retry.");
        }
        if (isLeader()) {
            this.proxyService.createTables(j, createTablesRequest);
        } else {
            Channel leaderChannel = leaderChannel();
            Throwable th = null;
            try {
                try {
                    proxy(leaderChannel).createTables(j, str, str2, createTablesRequest);
                    if (leaderChannel != null) {
                        if (0 != 0) {
                            try {
                                leaderChannel.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            leaderChannel.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        }
    }

    @Override // io.dingodb.store.proxy.meta.MetaServiceApi
    public void dropTables(long j, String str, String str2, DropTablesRequest dropTablesRequest) {
        if (!isReady()) {
            throw new RuntimeException("Offline, please wait and retry.");
        }
        if (isLeader()) {
            this.proxyService.dropTables(j, dropTablesRequest);
        } else {
            Channel leaderChannel = leaderChannel();
            Throwable th = null;
            try {
                try {
                    proxy(leaderChannel).dropTables(j, str, str2, dropTablesRequest);
                    if (leaderChannel != null) {
                        if (0 != 0) {
                            try {
                                leaderChannel.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            leaderChannel.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        }
    }

    @Override // io.dingodb.store.proxy.meta.MetaServiceApi
    public void createSchema(long j, String str, CreateSchemaRequest createSchemaRequest) {
        if (!isReady()) {
            throw new RuntimeException("Offline, please wait and retry.");
        }
        if (isLeader()) {
            this.proxyService.createSchema(j, createSchemaRequest);
        } else {
            Channel leaderChannel = leaderChannel();
            Throwable th = null;
            try {
                try {
                    proxy(leaderChannel).createSchema(j, str, createSchemaRequest);
                    if (leaderChannel != null) {
                        if (0 != 0) {
                            try {
                                leaderChannel.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            leaderChannel.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        }
    }

    @Override // io.dingodb.store.proxy.meta.MetaServiceApi
    public void dropSchema(long j, String str, DropSchemaRequest dropSchemaRequest) {
        if (!isReady()) {
            throw new RuntimeException("Offline, please wait and retry.");
        }
        if (isLeader()) {
            this.proxyService.dropSchema(j, dropSchemaRequest);
        } else {
            Channel leaderChannel = leaderChannel();
            Throwable th = null;
            try {
                try {
                    proxy(leaderChannel).dropSchema(j, str, dropSchemaRequest);
                    if (leaderChannel != null) {
                        if (0 != 0) {
                            try {
                                leaderChannel.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            leaderChannel.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        }
    }

    @Override // io.dingodb.store.proxy.meta.MetaServiceApi
    public void createIndex(long j, String str, String str2, String str3, CreateIndexRequest createIndexRequest) {
        if (!isReady()) {
            throw new RuntimeException("Offline, please wait and retry.");
        }
        if (isLeader()) {
            this.proxyService.createIndex(j, createIndexRequest);
        } else {
            Channel leaderChannel = leaderChannel();
            Throwable th = null;
            try {
                try {
                    proxy(leaderChannel).createIndex(j, str, str2, str3, createIndexRequest);
                    if (leaderChannel != null) {
                        if (0 != 0) {
                            try {
                                leaderChannel.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            leaderChannel.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        }
    }

    @Override // io.dingodb.store.proxy.meta.MetaServiceApi
    public void addIndexOnTable(long j, String str, AddIndexOnTableRequest addIndexOnTableRequest) {
        if (!isReady()) {
            throw new RuntimeException("Offline, please wait and retry.");
        }
        if (isLeader()) {
            this.proxyService.addIndexOnTable(j, addIndexOnTableRequest);
        } else {
            Channel leaderChannel = leaderChannel();
            Throwable th = null;
            try {
                try {
                    proxy(leaderChannel).addIndexOnTable(j, str, addIndexOnTableRequest);
                    if (leaderChannel != null) {
                        if (0 != 0) {
                            try {
                                leaderChannel.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            leaderChannel.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        }
    }

    @Override // io.dingodb.store.proxy.meta.MetaServiceApi
    public void updateIndex(long j, String str, UpdateIndexRequest updateIndexRequest) throws Exception {
        if (!isReady()) {
            throw new RuntimeException("Offline, please wait and retry.");
        }
        if (isLeader()) {
            this.proxyService.updateIndex(j, updateIndexRequest);
            return;
        }
        Channel leaderChannel = leaderChannel();
        Throwable th = null;
        try {
            try {
                proxy(leaderChannel).updateIndex(j, str, updateIndexRequest);
                if (leaderChannel != null) {
                    if (0 == 0) {
                        leaderChannel.close();
                        return;
                    }
                    try {
                        leaderChannel.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (leaderChannel != null) {
                if (th != null) {
                    try {
                        leaderChannel.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    leaderChannel.close();
                }
            }
            throw th4;
        }
    }

    @Override // io.dingodb.store.proxy.meta.MetaServiceApi
    public void dropIndex(long j, String str, DropIndexRequest dropIndexRequest) {
        if (!isReady()) {
            throw new RuntimeException("Offline, please wait and retry.");
        }
        if (isLeader()) {
            this.proxyService.dropIndex(j, dropIndexRequest);
        } else {
            Channel leaderChannel = leaderChannel();
            Throwable th = null;
            try {
                try {
                    proxy(leaderChannel).dropIndex(j, str, dropIndexRequest);
                    if (leaderChannel != null) {
                        if (0 != 0) {
                            try {
                                leaderChannel.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            leaderChannel.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        }
    }

    @Override // io.dingodb.store.proxy.meta.MetaServiceApi
    public void dropIndexOnTable(long j, String str, DropIndexOnTableRequest dropIndexOnTableRequest) {
        if (!isReady()) {
            throw new RuntimeException("Offline, please wait and retry.");
        }
        if (isLeader()) {
            this.proxyService.dropIndexOnTable(j, dropIndexOnTableRequest);
        } else {
            Channel leaderChannel = leaderChannel();
            Throwable th = null;
            try {
                try {
                    proxy(leaderChannel).dropIndexOnTable(j, str, dropIndexOnTableRequest);
                    if (leaderChannel != null) {
                        if (0 != 0) {
                            try {
                                leaderChannel.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            leaderChannel.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        }
    }
}
