/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.metadata.impl.zookeeper;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.pulsar.shade.org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.pulsar.shade.org.apache.zookeeper.CreateMode;
import org.apache.pulsar.shade.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooDefs;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooKeeper;

public class ZKMetadataStore
implements MetadataStore {
    private final boolean isZkManaged;
    private final ZooKeeper zkc;
    private final ExecutorService executor;

    public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws IOException {
        try {
            this.isZkManaged = true;
            this.zkc = ZooKeeperClient.newBuilder().connectString(metadataURL).connectRetryPolicy(new BoundExponentialBackoffRetryPolicy(100L, 60000L, Integer.MAX_VALUE)).allowReadOnlyMode(metadataStoreConfig.isAllowReadOnlyOperations()).sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis()).build();
        }
        catch (InterruptedException | KeeperException e) {
            throw new IOException(e);
        }
        this.executor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("zk-metadata-store-callback"));
    }

    @VisibleForTesting
    public ZKMetadataStore(ZooKeeper zkc) {
        this.isZkManaged = false;
        this.zkc = zkc;
        this.executor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("zk-metadata-store-callback"));
    }

    @Override
    public CompletableFuture<Optional<GetResult>> get(String path) {
        CompletableFuture<Optional<GetResult>> future = new CompletableFuture<Optional<GetResult>>();
        try {
            this.zkc.getData(path, null, (rc, path1, ctx, data, stat) -> this.executor.execute(() -> {
                KeeperException.Code code = KeeperException.Code.get(rc);
                if (code == KeeperException.Code.OK) {
                    future.complete(Optional.of(new GetResult(data, ZKMetadataStore.getStat(stat))));
                } else if (code == KeeperException.Code.NONODE) {
                    future.complete(Optional.empty());
                } else {
                    future.completeExceptionally(ZKMetadataStore.getException(code, path));
                }
            }), null);
        }
        catch (Throwable t) {
            future.completeExceptionally(new MetadataStoreException(t));
        }
        return future;
    }

    @Override
    public CompletableFuture<List<String>> getChildren(String path) {
        CompletableFuture<List<String>> future = new CompletableFuture<List<String>>();
        try {
            this.zkc.getChildren(path, null, (rc, path1, ctx, children) -> this.executor.execute(() -> {
                KeeperException.Code code = KeeperException.Code.get(rc);
                if (code == KeeperException.Code.OK) {
                    Collections.sort(children);
                    future.complete(children);
                } else if (code == KeeperException.Code.NONODE) {
                    ((CompletableFuture)this.exists(path).thenAccept(exists -> {
                        if (exists.booleanValue()) {
                            ((CompletableFuture)this.getChildren(path).thenAccept(c -> future.complete((List<String>)c))).exceptionally(ex -> {
                                future.completeExceptionally((Throwable)ex);
                                return null;
                            });
                        } else {
                            future.complete(Collections.emptyList());
                        }
                    })).exceptionally(ex -> {
                        future.completeExceptionally((Throwable)ex);
                        return null;
                    });
                    future.complete(Collections.emptyList());
                } else {
                    future.completeExceptionally(ZKMetadataStore.getException(code, path));
                }
            }), null);
        }
        catch (Throwable t) {
            future.completeExceptionally(new MetadataStoreException(t));
        }
        return future;
    }

    @Override
    public CompletableFuture<Boolean> exists(String path) {
        CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        try {
            this.zkc.exists(path, null, (rc, path1, ctx, stat) -> this.executor.execute(() -> {
                KeeperException.Code code = KeeperException.Code.get(rc);
                if (code == KeeperException.Code.OK) {
                    future.complete(true);
                } else if (code == KeeperException.Code.NONODE) {
                    future.complete(false);
                } else {
                    future.completeExceptionally(ZKMetadataStore.getException(code, path));
                }
            }), future);
        }
        catch (Throwable t) {
            future.completeExceptionally(new MetadataStoreException(t));
        }
        return future;
    }

    @Override
    public CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> optExpectedVersion) {
        boolean hasVersion = optExpectedVersion.isPresent();
        int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
        CompletableFuture<Stat> future = new CompletableFuture<Stat>();
        try {
            if (hasVersion && expectedVersion == -1) {
                ZkUtils.asyncCreateFullPathOptimistic(this.zkc, path, value, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path1, ctx, name) -> this.executor.execute(() -> {
                    KeeperException.Code code = KeeperException.Code.get(rc);
                    if (code == KeeperException.Code.OK) {
                        future.complete(new Stat(0L, 0L, 0L));
                    } else if (code == KeeperException.Code.NODEEXISTS) {
                        future.completeExceptionally(ZKMetadataStore.getException(KeeperException.Code.BADVERSION, path));
                    } else {
                        future.completeExceptionally(ZKMetadataStore.getException(code, path));
                    }
                }), null);
            } else {
                this.zkc.setData(path, value, expectedVersion, (rc, path1, ctx, stat) -> this.executor.execute(() -> {
                    KeeperException.Code code = KeeperException.Code.get(rc);
                    if (code == KeeperException.Code.OK) {
                        future.complete(ZKMetadataStore.getStat(stat));
                    } else if (code == KeeperException.Code.NONODE) {
                        if (hasVersion) {
                            future.completeExceptionally(ZKMetadataStore.getException(KeeperException.Code.BADVERSION, path));
                        } else {
                            ((CompletableFuture)this.put(path, value, Optional.of(-1L)).thenAccept(s -> future.complete((Stat)s))).exceptionally(ex -> {
                                future.completeExceptionally(ex.getCause());
                                return null;
                            });
                        }
                    } else {
                        future.completeExceptionally(ZKMetadataStore.getException(code, path));
                    }
                }), null);
            }
        }
        catch (Throwable t) {
            future.completeExceptionally(new MetadataStoreException(t));
        }
        return future;
    }

    @Override
    public CompletableFuture<Void> delete(String path, Optional<Long> optExpectedVersion) {
        int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        try {
            this.zkc.delete(path, expectedVersion, (rc, path1, ctx) -> this.executor.execute(() -> {
                KeeperException.Code code = KeeperException.Code.get(rc);
                if (code == KeeperException.Code.OK) {
                    future.complete(null);
                } else {
                    future.completeExceptionally(ZKMetadataStore.getException(code, path));
                }
            }), null);
        }
        catch (Throwable t) {
            future.completeExceptionally(new MetadataStoreException(t));
        }
        return future;
    }

    @Override
    public void close() throws Exception {
        if (this.isZkManaged) {
            this.zkc.close();
        }
        this.executor.shutdownNow();
    }

    private static Stat getStat(org.apache.pulsar.shade.org.apache.zookeeper.data.Stat zkStat) {
        return new Stat(zkStat.getVersion(), zkStat.getCtime(), zkStat.getMtime());
    }

    private static MetadataStoreException getException(KeeperException.Code code, String path) {
        KeeperException ex = KeeperException.create(code, path);
        switch (code) {
            case BADVERSION: {
                return new MetadataStoreException.BadVersionException(ex);
            }
            case NONODE: {
                return new MetadataStoreException.NotFoundException(ex);
            }
        }
        return new MetadataStoreException(ex);
    }
}

