/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.admin.internal;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.admin.internal.BaseResource;
import org.apache.pulsar.client.admin.internal.WebTargets;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.shade.javax.ws.rs.client.Entity;
import org.apache.pulsar.shade.javax.ws.rs.client.InvocationCallback;
import org.apache.pulsar.shade.javax.ws.rs.client.WebTarget;
import org.apache.pulsar.shade.javax.ws.rs.core.GenericType;
import org.apache.pulsar.shade.javax.ws.rs.core.MultivaluedMap;
import org.apache.pulsar.shade.javax.ws.rs.core.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicsImpl
extends BaseResource
implements Topics {
    private final WebTarget adminTopics;
    private final WebTarget adminV2Topics;
    private final String BATCH_HEADER = "X-Pulsar-num-batch-message";
    private final String MESSAGE_ID = "X-Pulsar-Message-ID";
    private final String PUBLISH_TIME = "X-Pulsar-publish-time";
    private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);

    public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
        super(auth, readTimeoutMs);
        this.adminTopics = web.path("/admin");
        this.adminV2Topics = web.path("/admin/v2");
    }

    @Override
    public List<String> getList(String namespace) throws PulsarAdminException {
        try {
            return this.getListAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<List<String>> getListAsync(String namespace) {
        NamespaceName ns = NamespaceName.get((String)namespace);
        WebTarget persistentPath = this.namespacePath("persistent", ns, new String[0]);
        WebTarget nonPersistentPath = this.namespacePath("non-persistent", ns, new String[0]);
        final CompletableFuture persistentList = new CompletableFuture();
        final CompletableFuture nonPersistentList = new CompletableFuture();
        this.asyncGetRequest(persistentPath, new InvocationCallback<List<String>>(){

            @Override
            public void completed(List<String> topics) {
                persistentList.complete(topics);
            }

            @Override
            public void failed(Throwable throwable) {
                persistentList.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        this.asyncGetRequest(nonPersistentPath, new InvocationCallback<List<String>>(){

            @Override
            public void completed(List<String> a) {
                nonPersistentList.complete(a);
            }

            @Override
            public void failed(Throwable throwable) {
                nonPersistentList.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return persistentList.thenCombine((CompletionStage)nonPersistentList, (l1, l2) -> new ArrayList(Stream.concat(l1.stream(), l2.stream()).collect(Collectors.toSet())));
    }

    @Override
    public List<String> getPartitionedTopicList(String namespace) throws PulsarAdminException {
        try {
            return this.getPartitionedTopicListAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<List<String>> getPartitionedTopicListAsync(String namespace) {
        NamespaceName ns = NamespaceName.get((String)namespace);
        WebTarget persistentPath = this.namespacePath("persistent", ns, "partitioned");
        WebTarget nonPersistentPath = this.namespacePath("non-persistent", ns, "partitioned");
        final CompletableFuture persistentList = new CompletableFuture();
        final CompletableFuture nonPersistentList = new CompletableFuture();
        this.asyncGetRequest(persistentPath, new InvocationCallback<List<String>>(){

            @Override
            public void completed(List<String> topics) {
                persistentList.complete(topics);
            }

            @Override
            public void failed(Throwable throwable) {
                persistentList.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        this.asyncGetRequest(nonPersistentPath, new InvocationCallback<List<String>>(){

            @Override
            public void completed(List<String> topics) {
                nonPersistentList.complete(topics);
            }

            @Override
            public void failed(Throwable throwable) {
                nonPersistentList.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return persistentList.thenCombine((CompletionStage)nonPersistentList, (l1, l2) -> new ArrayList(Stream.concat(l1.stream(), l2.stream()).collect(Collectors.toSet())));
    }

    @Override
    public List<String> getListInBundle(String namespace, String bundleRange) throws PulsarAdminException {
        try {
            return this.getListInBundleAsync(namespace, bundleRange).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<List<String>> getListInBundleAsync(String namespace, String bundleRange) {
        NamespaceName ns = NamespaceName.get((String)namespace);
        final CompletableFuture<List<String>> future = new CompletableFuture<List<String>>();
        WebTarget path = this.namespacePath("non-persistent", ns, bundleRange);
        this.asyncGetRequest(path, new InvocationCallback<List<String>>(){

            @Override
            public void completed(List<String> response) {
                future.complete(response);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public Map<String, Set<AuthAction>> getPermissions(String topic) throws PulsarAdminException {
        try {
            return this.getPermissionsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Map<String, Set<AuthAction>>> getPermissionsAsync(String topic) {
        TopicName tn = TopicName.get((String)topic);
        WebTarget path = this.topicPath(tn, "permissions");
        final CompletableFuture<Map<String, Set<AuthAction>>> future = new CompletableFuture<Map<String, Set<AuthAction>>>();
        this.asyncGetRequest(path, new InvocationCallback<Map<String, Set<AuthAction>>>(){

            @Override
            public void completed(Map<String, Set<AuthAction>> permissions) {
                future.complete(permissions);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public void grantPermission(String topic, String role, Set<AuthAction> actions) throws PulsarAdminException {
        try {
            this.grantPermissionAsync(topic, role, actions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> grantPermissionAsync(String topic, String role, Set<AuthAction> actions) {
        TopicName tn = TopicName.get((String)topic);
        WebTarget path = this.topicPath(tn, "permissions", role);
        return this.asyncPostRequest(path, Entity.entity(actions, "application/json"));
    }

    @Override
    public void revokePermissions(String topic, String role) throws PulsarAdminException {
        try {
            this.revokePermissionsAsync(topic, role).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> revokePermissionsAsync(String topic, String role) {
        TopicName tn = TopicName.get((String)topic);
        WebTarget path = this.topicPath(tn, "permissions", role);
        return this.asyncDeleteRequest(path);
    }

    @Override
    public void createPartitionedTopic(String topic, int numPartitions) throws PulsarAdminException {
        try {
            this.createPartitionedTopicAsync(topic, numPartitions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public void createNonPartitionedTopic(String topic) throws PulsarAdminException {
        try {
            this.createNonPartitionedTopicAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public void createMissedPartitions(String topic) throws PulsarAdminException {
        try {
            this.createMissedPartitionsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> createNonPartitionedTopicAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, new String[0]);
        return this.asyncPutRequest(path, Entity.entity("", "application/json"));
    }

    @Override
    public CompletableFuture<Void> createPartitionedTopicAsync(String topic, int numPartitions) {
        Preconditions.checkArgument((numPartitions > 0 ? 1 : 0) != 0, (Object)"Number of partitions should be more than 0");
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "partitions");
        return this.asyncPutRequest(path, Entity.entity(Integer.valueOf(numPartitions), "application/json"));
    }

    @Override
    public CompletableFuture<Void> createMissedPartitionsAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "createMissedPartitions");
        return this.asyncPostRequest(path, Entity.entity("", "application/json"));
    }

    @Override
    public void updatePartitionedTopic(String topic, int numPartitions) throws PulsarAdminException {
        try {
            this.updatePartitionedTopicAsync(topic, numPartitions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> updatePartitionedTopicAsync(String topic, int numPartitions) {
        return this.updatePartitionedTopicAsync(topic, numPartitions, false);
    }

    @Override
    public void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocalTopicOnly) throws PulsarAdminException {
        try {
            this.updatePartitionedTopicAsync(topic, numPartitions, updateLocalTopicOnly).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> updatePartitionedTopicAsync(String topic, int numPartitions, boolean updateLocalTopicOnly) {
        Preconditions.checkArgument((numPartitions > 0 ? 1 : 0) != 0, (Object)"Number of partitions must be more than 0");
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "partitions");
        path = path.queryParam("updateLocalTopicOnly", Boolean.toString(updateLocalTopicOnly));
        return this.asyncPostRequest(path, Entity.entity(Integer.valueOf(numPartitions), "application/json"));
    }

    @Override
    public PartitionedTopicMetadata getPartitionedTopicMetadata(String topic) throws PulsarAdminException {
        try {
            return this.getPartitionedTopicMetadataAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "partitions");
        final CompletableFuture<PartitionedTopicMetadata> future = new CompletableFuture<PartitionedTopicMetadata>();
        this.asyncGetRequest(path, new InvocationCallback<PartitionedTopicMetadata>(){

            @Override
            public void completed(PartitionedTopicMetadata response) {
                future.complete(response);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public void deletePartitionedTopic(String topic) throws PulsarAdminException {
        this.deletePartitionedTopic(topic, false);
    }

    @Override
    public CompletableFuture<Void> deletePartitionedTopicAsync(String topic) {
        return this.deletePartitionedTopicAsync(topic, false);
    }

    @Override
    public void deletePartitionedTopic(String topic, boolean force, boolean deleteSchema) throws PulsarAdminException {
        try {
            this.deletePartitionedTopicAsync(topic, force, deleteSchema).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> deletePartitionedTopicAsync(String topic, boolean force, boolean deleteSchema) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "partitions").queryParam("force", Boolean.toString(force)).queryParam("deleteSchema", Boolean.toString(deleteSchema));
        return this.asyncDeleteRequest(path);
    }

    @Override
    public void delete(String topic) throws PulsarAdminException {
        this.delete(topic, false);
    }

    @Override
    public CompletableFuture<Void> deleteAsync(String topic) {
        return this.deleteAsync(topic, false);
    }

    @Override
    public void delete(String topic, boolean force, boolean deleteSchema) throws PulsarAdminException {
        try {
            this.deleteAsync(topic, force, deleteSchema).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> deleteAsync(String topic, boolean force, boolean deleteSchema) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, new String[0]).queryParam("force", Boolean.toString(force)).queryParam("deleteSchema", Boolean.toString(deleteSchema));
        return this.asyncDeleteRequest(path);
    }

    @Override
    public void unload(String topic) throws PulsarAdminException {
        try {
            this.unloadAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> unloadAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "unload");
        return this.asyncPutRequest(path, Entity.entity("", "application/json"));
    }

    @Override
    public MessageId terminateTopic(String topic) throws PulsarAdminException {
        try {
            return this.terminateTopicAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<MessageId> terminateTopicAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        final CompletableFuture<MessageId> future = new CompletableFuture<MessageId>();
        try {
            final WebTarget path = this.topicPath(tn, "terminate");
            this.request(path).async().post(Entity.entity("", "application/json"), new InvocationCallback<MessageIdImpl>(){

                @Override
                public void completed(MessageIdImpl messageId) {
                    future.complete(messageId);
                }

                @Override
                public void failed(Throwable throwable) {
                    log.warn("[{}] Failed to perform http post request: {}", (Object)path.getUri(), (Object)throwable.getMessage());
                    future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
                }
            });
        }
        catch (PulsarAdminException cae) {
            future.completeExceptionally(cae);
        }
        return future;
    }

    @Override
    public List<String> getSubscriptions(String topic) throws PulsarAdminException {
        try {
            return this.getSubscriptionsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<List<String>> getSubscriptionsAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "subscriptions");
        final CompletableFuture<List<String>> future = new CompletableFuture<List<String>>();
        this.asyncGetRequest(path, new InvocationCallback<List<String>>(){

            @Override
            public void completed(List<String> response) {
                future.complete(response);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public TopicStats getStats(String topic, boolean getPreciseBacklog, boolean subscriptionBacklogSize) throws PulsarAdminException {
        try {
            return this.getStatsAsync(topic, getPreciseBacklog, subscriptionBacklogSize).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<TopicStats> getStatsAsync(String topic, boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "stats").queryParam("getPreciseBacklog", getPreciseBacklog).queryParam("subscriptionBacklogSize", subscriptionBacklogSize);
        final CompletableFuture<TopicStats> future = new CompletableFuture<TopicStats>();
        this.asyncGetRequest(path, new InvocationCallback<TopicStats>(){

            @Override
            public void completed(TopicStats response) {
                future.complete(response);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public PersistentTopicInternalStats getInternalStats(String topic) throws PulsarAdminException {
        return this.getInternalStats(topic, false);
    }

    @Override
    public PersistentTopicInternalStats getInternalStats(String topic, boolean metadata) throws PulsarAdminException {
        try {
            return this.getInternalStatsAsync(topic, metadata).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<PersistentTopicInternalStats> getInternalStatsAsync(String topic) {
        return this.getInternalStatsAsync(topic, false);
    }

    @Override
    public CompletableFuture<PersistentTopicInternalStats> getInternalStatsAsync(String topic, boolean metadata) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "internalStats");
        path = path.queryParam("metadata", metadata);
        final CompletableFuture<PersistentTopicInternalStats> future = new CompletableFuture<PersistentTopicInternalStats>();
        this.asyncGetRequest(path, new InvocationCallback<PersistentTopicInternalStats>(){

            @Override
            public void completed(PersistentTopicInternalStats response) {
                future.complete(response);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public JsonObject getInternalInfo(String topic) throws PulsarAdminException {
        try {
            return this.getInternalInfoAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<JsonObject> getInternalInfoAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "internal-info");
        final CompletableFuture<JsonObject> future = new CompletableFuture<JsonObject>();
        this.asyncGetRequest(path, new InvocationCallback<String>(){

            @Override
            public void completed(String response) {
                JsonObject json = (JsonObject)new Gson().fromJson(response, JsonObject.class);
                future.complete(json);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition, boolean getPreciseBacklog, boolean subscriptionBacklogSize) throws PulsarAdminException {
        try {
            return this.getPartitionedStatsAsync(topic, perPartition, getPreciseBacklog, subscriptionBacklogSize).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync(String topic, final boolean perPartition, boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "partitioned-stats");
        path = path.queryParam("perPartition", perPartition).queryParam("getPreciseBacklog", getPreciseBacklog).queryParam("subscriptionBacklogSize", subscriptionBacklogSize);
        final CompletableFuture<PartitionedTopicStats> future = new CompletableFuture<PartitionedTopicStats>();
        this.asyncGetRequest(path, new InvocationCallback<PartitionedTopicStats>(){

            @Override
            public void completed(PartitionedTopicStats response) {
                if (!perPartition) {
                    response.partitions.clear();
                }
                future.complete(response);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public PartitionedTopicInternalStats getPartitionedInternalStats(String topic) throws PulsarAdminException {
        try {
            return this.getPartitionedInternalStatsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<PartitionedTopicInternalStats> getPartitionedInternalStatsAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "partitioned-internalStats");
        final CompletableFuture<PartitionedTopicInternalStats> future = new CompletableFuture<PartitionedTopicInternalStats>();
        this.asyncGetRequest(path, new InvocationCallback<PartitionedTopicInternalStats>(){

            @Override
            public void completed(PartitionedTopicInternalStats response) {
                future.complete(response);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public void deleteSubscription(String topic, String subName) throws PulsarAdminException {
        try {
            this.deleteSubscriptionAsync(topic, subName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public void deleteSubscription(String topic, String subName, boolean force) throws PulsarAdminException {
        try {
            this.deleteSubscriptionAsync(topic, subName, force).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> deleteSubscriptionAsync(String topic, String subName) {
        return this.deleteSubscriptionAsync(topic, subName, false);
    }

    @Override
    public CompletableFuture<Void> deleteSubscriptionAsync(String topic, String subName, boolean force) {
        TopicName tn = this.validateTopic(topic);
        String encodedSubName = Codec.encode((String)subName);
        WebTarget path = this.topicPath(tn, "subscription", encodedSubName);
        path = path.queryParam("force", force);
        return this.asyncDeleteRequest(path);
    }

    @Override
    public void skipAllMessages(String topic, String subName) throws PulsarAdminException {
        try {
            this.skipAllMessagesAsync(topic, subName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> skipAllMessagesAsync(String topic, String subName) {
        TopicName tn = this.validateTopic(topic);
        String encodedSubName = Codec.encode((String)subName);
        WebTarget path = this.topicPath(tn, "subscription", encodedSubName, "skip_all");
        return this.asyncPostRequest(path, Entity.entity("", "application/json"));
    }

    @Override
    public void skipMessages(String topic, String subName, long numMessages) throws PulsarAdminException {
        try {
            this.skipMessagesAsync(topic, subName, numMessages).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> skipMessagesAsync(String topic, String subName, long numMessages) {
        TopicName tn = this.validateTopic(topic);
        String encodedSubName = Codec.encode((String)subName);
        WebTarget path = this.topicPath(tn, "subscription", encodedSubName, "skip", String.valueOf(numMessages));
        return this.asyncPostRequest(path, Entity.entity("", "application/json"));
    }

    @Override
    public void expireMessages(String topic, String subName, long expireTimeInSeconds) throws PulsarAdminException {
        try {
            this.expireMessagesAsync(topic, subName, expireTimeInSeconds).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> expireMessagesAsync(String topic, String subName, long expireTimeInSeconds) {
        TopicName tn = this.validateTopic(topic);
        String encodedSubName = Codec.encode((String)subName);
        WebTarget path = this.topicPath(tn, "subscription", encodedSubName, "expireMessages", String.valueOf(expireTimeInSeconds));
        return this.asyncPostRequest(path, Entity.entity("", "application/json"));
    }

    @Override
    public void expireMessages(String topic, String subscriptionName, MessageId messageId, boolean isExcluded) throws PulsarAdminException {
        try {
            this.expireMessagesAsync(topic, subscriptionName, messageId, isExcluded).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> expireMessagesAsync(String topic, String subscriptionName, MessageId messageId, boolean isExcluded) {
        TopicName tn = this.validateTopic(topic);
        String encodedSubName = Codec.encode((String)subscriptionName);
        ResetCursorData resetCursorData = new ResetCursorData(messageId);
        resetCursorData.setExcluded(isExcluded);
        WebTarget path = this.topicPath(tn, "subscription", encodedSubName, "expireMessages");
        return this.asyncPostRequest(path, Entity.entity(resetCursorData, "application/json"));
    }

    @Override
    public void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) throws PulsarAdminException {
        try {
            this.expireMessagesForAllSubscriptionsAsync(topic, expireTimeInSeconds).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> expireMessagesForAllSubscriptionsAsync(String topic, long expireTimeInSeconds) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "all_subscription", "expireMessages", String.valueOf(expireTimeInSeconds));
        return this.asyncPostRequest(path, Entity.entity("", "application/json"));
    }

    private CompletableFuture<List<Message<byte[]>>> peekNthMessage(String topic, String subName, int messagePosition) {
        final TopicName tn = this.validateTopic(topic);
        String encodedSubName = Codec.encode((String)subName);
        WebTarget path = this.topicPath(tn, "subscription", encodedSubName, "position", String.valueOf(messagePosition));
        final CompletableFuture<List<Message<byte[]>>> future = new CompletableFuture<List<Message<byte[]>>>();
        this.asyncGetRequest(path, new InvocationCallback<Response>(){

            @Override
            public void completed(Response response) {
                try {
                    future.complete(TopicsImpl.this.getMessagesFromHttpResponse(tn.toString(), response));
                }
                catch (Exception e) {
                    future.completeExceptionally(TopicsImpl.this.getApiException(e));
                }
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public List<Message<byte[]>> peekMessages(String topic, String subName, int numMessages) throws PulsarAdminException {
        try {
            return this.peekMessagesAsync(topic, subName, numMessages).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(String topic, String subName, int numMessages) {
        Preconditions.checkArgument((numMessages > 0 ? 1 : 0) != 0);
        CompletableFuture<List<Message<byte[]>>> future = new CompletableFuture<List<Message<byte[]>>>();
        this.peekMessagesAsync(topic, subName, numMessages, Lists.newArrayList(), future, 1);
        return future;
    }

    private void peekMessagesAsync(String topic, String subName, int numMessages, List<Message<byte[]>> messages, CompletableFuture<List<Message<byte[]>>> future, int nthMessage) {
        if (numMessages <= 0) {
            future.complete(messages);
            return;
        }
        this.peekNthMessage(topic, subName, nthMessage).handle((r, ex) -> {
            if (ex != null) {
                if (ex instanceof PulsarAdminException.NotFoundException) {
                    log.warn("Exception '{}' occurred while trying to peek Messages.", (Object)ex.getMessage());
                    future.complete(messages);
                } else {
                    future.completeExceptionally((Throwable)ex);
                }
                return null;
            }
            for (int i = 0; i < Math.min(r.size(), numMessages); ++i) {
                messages.add((Message<byte[]>)r.get(i));
            }
            this.peekMessagesAsync(topic, subName, numMessages - r.size(), messages, future, nthMessage + 1);
            return null;
        });
    }

    @Override
    public Message<byte[]> examineMessage(String topic, String initialPosition, long messagePosition) throws PulsarAdminException {
        try {
            return this.examineMessageAsync(topic, initialPosition, messagePosition).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Message<byte[]>> examineMessageAsync(String topic, String initialPosition, long messagePosition) {
        final TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "examinemessage").queryParam("initialPosition", initialPosition).queryParam("messagePosition", messagePosition);
        final CompletableFuture<Message<byte[]>> future = new CompletableFuture<Message<byte[]>>();
        this.asyncGetRequest(path, new InvocationCallback<Response>(){

            @Override
            public void completed(Response response) {
                try {
                    List messages = TopicsImpl.this.getMessagesFromHttpResponse(tn.toString(), response);
                    if (messages.size() > 0) {
                        future.complete(messages.get(0));
                    } else {
                        future.complete(null);
                    }
                }
                catch (Exception e) {
                    future.completeExceptionally(TopicsImpl.this.getApiException(e));
                }
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public CompletableFuture<Message<byte[]>> getMessageByIdAsync(String topic, long ledgerId, long entryId) {
        CompletableFuture<Message<byte[]>> future = new CompletableFuture<Message<byte[]>>();
        this.getRemoteMessageById(topic, ledgerId, entryId).handle((r, ex) -> {
            if (ex != null) {
                if (ex instanceof PulsarAdminException.NotFoundException) {
                    log.warn("Exception '{}' occurred while trying to get message.", (Object)ex.getMessage());
                    future.complete((Message<byte[]>)r);
                } else {
                    future.completeExceptionally((Throwable)ex);
                }
                return null;
            }
            future.complete((Message<byte[]>)r);
            return null;
        });
        return future;
    }

    private CompletableFuture<Message<byte[]>> getRemoteMessageById(String topic, long ledgerId, long entryId) {
        final TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "ledger", Long.toString(ledgerId), "entry", Long.toString(entryId));
        final CompletableFuture<Message<byte[]>> future = new CompletableFuture<Message<byte[]>>();
        this.asyncGetRequest(path, new InvocationCallback<Response>(){

            @Override
            public void completed(Response response) {
                try {
                    future.complete(TopicsImpl.this.getMessagesFromHttpResponse(topicName.toString(), response).get(0));
                }
                catch (Exception e) {
                    future.completeExceptionally(TopicsImpl.this.getApiException(e));
                }
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public Message<byte[]> getMessageById(String topic, long ledgerId, long entryId) throws PulsarAdminException {
        try {
            return this.getMessageByIdAsync(topic, ledgerId, entryId).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public void createSubscription(String topic, String subscriptionName, MessageId messageId) throws PulsarAdminException {
        try {
            TopicName tn = this.validateTopic(topic);
            String encodedSubName = Codec.encode((String)subscriptionName);
            WebTarget path = this.topicPath(tn, "subscription", encodedSubName);
            this.request(path).put(Entity.entity(messageId, "application/json"), ErrorData.class);
        }
        catch (Exception e) {
            throw this.getApiException(e);
        }
    }

    @Override
    public CompletableFuture<Void> createSubscriptionAsync(String topic, String subscriptionName, MessageId messageId) {
        TopicName tn = this.validateTopic(topic);
        String encodedSubName = Codec.encode((String)subscriptionName);
        WebTarget path = this.topicPath(tn, "subscription", encodedSubName);
        return this.asyncPutRequest(path, Entity.entity(messageId, "application/json"));
    }

    @Override
    public void resetCursor(String topic, String subName, long timestamp) throws PulsarAdminException {
        try {
            TopicName tn = this.validateTopic(topic);
            String encodedSubName = Codec.encode((String)subName);
            WebTarget path = this.topicPath(tn, "subscription", encodedSubName, "resetcursor", String.valueOf(timestamp));
            this.request(path).post(Entity.entity("", "application/json"), ErrorData.class);
        }
        catch (Exception e) {
            throw this.getApiException(e);
        }
    }

    @Override
    public CompletableFuture<Void> resetCursorAsync(String topic, String subName, long timestamp) {
        TopicName tn = this.validateTopic(topic);
        String encodedSubName = Codec.encode((String)subName);
        WebTarget path = this.topicPath(tn, "subscription", encodedSubName, "resetcursor", String.valueOf(timestamp));
        return this.asyncPostRequest(path, Entity.entity("", "application/json"));
    }

    @Override
    public void resetCursor(String topic, String subName, MessageId messageId) throws PulsarAdminException {
        try {
            this.resetCursorAsync(topic, subName, messageId).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
        catch (Exception e) {
            throw this.getApiException(e);
        }
    }

    @Override
    public void resetCursor(String topic, String subName, MessageId messageId, boolean isExcluded) throws PulsarAdminException {
        try {
            this.resetCursorAsync(topic, subName, messageId, isExcluded).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
        catch (Exception e) {
            throw this.getApiException(e);
        }
    }

    @Override
    public CompletableFuture<Void> resetCursorAsync(String topic, String subName, MessageId messageId) {
        return this.resetCursorAsync(topic, subName, messageId, false);
    }

    @Override
    public CompletableFuture<Void> resetCursorAsync(String topic, String subName, MessageId messageId, boolean isExcluded) {
        TopicName tn = this.validateTopic(topic);
        String encodedSubName = Codec.encode((String)subName);
        WebTarget path = this.topicPath(tn, "subscription", encodedSubName, "resetcursor");
        ResetCursorData resetCursorData = new ResetCursorData(messageId);
        resetCursorData.setExcluded(isExcluded);
        return this.asyncPostRequest(path, Entity.entity(resetCursorData, "application/json"));
    }

    @Override
    public void triggerCompaction(String topic) throws PulsarAdminException {
        try {
            this.triggerCompactionAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> triggerCompactionAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "compaction");
        return this.asyncPutRequest(path, Entity.entity("", "application/json"));
    }

    @Override
    public LongRunningProcessStatus compactionStatus(String topic) throws PulsarAdminException {
        try {
            return this.compactionStatusAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<LongRunningProcessStatus> compactionStatusAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "compaction");
        final CompletableFuture<LongRunningProcessStatus> future = new CompletableFuture<LongRunningProcessStatus>();
        this.asyncGetRequest(path, new InvocationCallback<LongRunningProcessStatus>(){

            @Override
            public void completed(LongRunningProcessStatus longRunningProcessStatus) {
                future.complete(longRunningProcessStatus);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public void triggerOffload(String topic, MessageId messageId) throws PulsarAdminException {
        try {
            this.triggerOffloadAsync(topic, messageId).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> triggerOffloadAsync(String topic, MessageId messageId) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "offload");
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        try {
            this.request(path).async().put(Entity.entity(messageId, "application/json"), new InvocationCallback<MessageIdImpl>(){

                @Override
                public void completed(MessageIdImpl response) {
                    future.complete(null);
                }

                @Override
                public void failed(Throwable throwable) {
                    future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
                }
            });
        }
        catch (PulsarAdminException cae) {
            future.completeExceptionally(cae);
        }
        return future;
    }

    @Override
    public OffloadProcessStatus offloadStatus(String topic) throws PulsarAdminException {
        try {
            return this.offloadStatusAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<OffloadProcessStatus> offloadStatusAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "offload");
        final CompletableFuture<OffloadProcessStatus> future = new CompletableFuture<OffloadProcessStatus>();
        this.asyncGetRequest(path, new InvocationCallback<OffloadProcessStatus>(){

            @Override
            public void completed(OffloadProcessStatus offloadProcessStatus) {
                future.complete(offloadProcessStatus);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    private WebTarget namespacePath(String domain, NamespaceName namespace, String ... parts) {
        WebTarget base = namespace.isV2() ? this.adminV2Topics : this.adminTopics;
        WebTarget namespacePath = base.path(domain).path(namespace.toString());
        namespacePath = WebTargets.addParts(namespacePath, parts);
        return namespacePath;
    }

    private WebTarget topicPath(TopicName topic, String ... parts) {
        WebTarget base = topic.isV2() ? this.adminV2Topics : this.adminTopics;
        WebTarget topicPath = base.path(topic.getRestPath());
        topicPath = WebTargets.addParts(topicPath, parts);
        return topicPath;
    }

    private TopicName validateTopic(String topic) {
        return TopicName.get((String)topic);
    }

    private List<Message<byte[]>> getMessagesFromHttpResponse(String topic, Response response) throws Exception {
        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
            throw this.getApiException(response);
        }
        String msgId = response.getHeaderString("X-Pulsar-Message-ID");
        PulsarApi.MessageMetadata.Builder messageMetadata = PulsarApi.MessageMetadata.newBuilder();
        try (InputStream stream = (InputStream)response.getEntity();){
            byte[] data = new byte[stream.available()];
            stream.read(data);
            TreeMap properties = Maps.newTreeMap();
            MultivaluedMap<String, Object> headers = response.getHeaders();
            Object tmp = headers.getFirst("X-Pulsar-publish-time");
            if (tmp != null) {
                properties.put("publish-time", (String)tmp);
            }
            if ((tmp = headers.getFirst("X-Pulsar-null-value")) != null) {
                messageMetadata.setNullValue(Boolean.parseBoolean(tmp.toString()));
            }
            tmp = headers.getFirst("X-Pulsar-num-batch-message");
            if (response.getHeaderString("X-Pulsar-num-batch-message") != null) {
                properties.put("X-Pulsar-num-batch-message", (String)tmp);
                List<Message<byte[]>> list = this.getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata);
                return list;
            }
            for (Map.Entry entry : headers.entrySet()) {
                String header = (String)entry.getKey();
                if (!header.contains("X-Pulsar-PROPERTY-")) continue;
                String keyName = header.substring("X-Pulsar-PROPERTY-".length());
                properties.put(keyName, (String)((List)entry.getValue()).get(0));
            }
            List list = Collections.singletonList(new MessageImpl(topic, msgId, (Map<String, String>)properties, Unpooled.wrappedBuffer((byte[])data), Schema.BYTES, messageMetadata));
            return list;
        }
    }

    private List<Message<byte[]>> getIndividualMsgsFromBatch(String topic, String msgId, byte[] data, Map<String, String> properties, PulsarApi.MessageMetadata.Builder msgMetadataBuilder) {
        ArrayList<Message<byte[]>> ret = new ArrayList<Message<byte[]>>();
        int batchSize = Integer.parseInt(properties.get("X-Pulsar-num-batch-message"));
        ByteBuf buf = Unpooled.wrappedBuffer((byte[])data);
        for (int i = 0; i < batchSize; ++i) {
            String batchMsgId = msgId + ":" + i;
            PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata.newBuilder();
            try {
                ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch((ByteBuf)buf, (PulsarApi.SingleMessageMetadata.Builder)singleMessageMetadataBuilder, (int)i, (int)batchSize);
                PulsarApi.SingleMessageMetadata singleMessageMetadata = singleMessageMetadataBuilder.build();
                if (singleMessageMetadata.getPropertiesCount() > 0) {
                    for (PulsarApi.KeyValue entry : singleMessageMetadata.getPropertiesList()) {
                        properties.put(entry.getKey(), entry.getValue());
                    }
                }
                ret.add(new MessageImpl(topic, batchMsgId, properties, singleMessagePayload, Schema.BYTES, msgMetadataBuilder));
            }
            catch (Exception ex) {
                log.error("Exception occurred while trying to get BatchMsgId: {}", (Object)batchMsgId, (Object)ex);
            }
            singleMessageMetadataBuilder.recycle();
        }
        buf.release();
        return ret;
    }

    @Override
    public MessageId getLastMessageId(String topic) throws PulsarAdminException {
        try {
            return this.getLastMessageIdAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<MessageId> getLastMessageIdAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "lastMessageId");
        final CompletableFuture<MessageId> future = new CompletableFuture<MessageId>();
        this.asyncGetRequest(path, new InvocationCallback<BatchMessageIdImpl>(){

            @Override
            public void completed(BatchMessageIdImpl response) {
                if (response.getBatchIndex() == -1) {
                    future.complete(new MessageIdImpl(response.getLedgerId(), response.getEntryId(), response.getPartitionIndex()));
                }
                future.complete(response);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String topic) throws PulsarAdminException {
        try {
            TopicName tn = this.validateTopic(topic);
            WebTarget path = this.topicPath(tn, "backlogQuotaMap");
            return this.request(path).get(new GenericType<Map<BacklogQuota.BacklogQuotaType, BacklogQuota>>(){});
        }
        catch (Exception e) {
            throw this.getApiException(e);
        }
    }

    @Override
    public void setBacklogQuota(String topic, BacklogQuota backlogQuota) throws PulsarAdminException {
        try {
            TopicName tn = this.validateTopic(topic);
            WebTarget path = this.topicPath(tn, "backlogQuota");
            this.request(path).post(Entity.entity(backlogQuota, "application/json"), ErrorData.class);
        }
        catch (Exception e) {
            throw this.getApiException(e);
        }
    }

    @Override
    public void removeBacklogQuota(String topic) throws PulsarAdminException {
        try {
            TopicName tn = this.validateTopic(topic);
            WebTarget path = this.topicPath(tn, "backlogQuota");
            this.request(path.queryParam("backlogQuotaType", BacklogQuota.BacklogQuotaType.destination_storage.toString())).delete(ErrorData.class);
        }
        catch (Exception e) {
            throw this.getApiException(e);
        }
    }

    @Override
    public Integer getMaxUnackedMessagesOnConsumer(String topic) throws PulsarAdminException {
        try {
            return this.getMaxUnackedMessagesOnConsumerAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Integer> getMaxUnackedMessagesOnConsumerAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "maxUnackedMessagesOnConsumer");
        final CompletableFuture<Integer> future = new CompletableFuture<Integer>();
        this.asyncGetRequest(path, new InvocationCallback<Integer>(){

            @Override
            public void completed(Integer maxNum) {
                future.complete(maxNum);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public CompletableFuture<Void> setMaxUnackedMessagesOnConsumerAsync(String topic, int maxNum) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "maxUnackedMessagesOnConsumer");
        return this.asyncPostRequest(path, Entity.entity(Integer.valueOf(maxNum), "application/json"));
    }

    @Override
    public void setMaxUnackedMessagesOnConsumer(String topic, int maxNum) throws PulsarAdminException {
        try {
            this.setMaxUnackedMessagesOnConsumerAsync(topic, maxNum).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> removeMaxUnackedMessagesOnConsumerAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "maxUnackedMessagesOnConsumer");
        return this.asyncDeleteRequest(path);
    }

    @Override
    public void removeMaxUnackedMessagesOnConsumer(String topic) throws PulsarAdminException {
        try {
            this.removeMaxUnackedMessagesOnConsumerAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public InactiveTopicPolicies getInactiveTopicPolicies(String topic) throws PulsarAdminException {
        try {
            return this.getInactiveTopicPoliciesAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<InactiveTopicPolicies> getInactiveTopicPoliciesAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "inactiveTopicPolicies");
        final CompletableFuture<InactiveTopicPolicies> future = new CompletableFuture<InactiveTopicPolicies>();
        this.asyncGetRequest(path, new InvocationCallback<InactiveTopicPolicies>(){

            @Override
            public void completed(InactiveTopicPolicies inactiveTopicPolicies) {
                future.complete(inactiveTopicPolicies);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public CompletableFuture<Void> setInactiveTopicPoliciesAsync(String topic, InactiveTopicPolicies inactiveTopicPolicies) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "inactiveTopicPolicies");
        return this.asyncPostRequest(path, Entity.entity(inactiveTopicPolicies, "application/json"));
    }

    @Override
    public void setInactiveTopicPolicies(String topic, InactiveTopicPolicies inactiveTopicPolicies) throws PulsarAdminException {
        try {
            this.setInactiveTopicPoliciesAsync(topic, inactiveTopicPolicies).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> removeInactiveTopicPoliciesAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "inactiveTopicPolicies");
        return this.asyncDeleteRequest(path);
    }

    @Override
    public void removeInactiveTopicPolicies(String topic) throws PulsarAdminException {
        try {
            this.removeInactiveTopicPoliciesAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public DelayedDeliveryPolicies getDelayedDeliveryPolicy(String topic) throws PulsarAdminException {
        try {
            return this.getDelayedDeliveryPolicyAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<DelayedDeliveryPolicies> getDelayedDeliveryPolicyAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "delayedDelivery");
        final CompletableFuture<DelayedDeliveryPolicies> future = new CompletableFuture<DelayedDeliveryPolicies>();
        this.asyncGetRequest(path, new InvocationCallback<DelayedDeliveryPolicies>(){

            @Override
            public void completed(DelayedDeliveryPolicies delayedDeliveryPolicies) {
                future.complete(delayedDeliveryPolicies);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public CompletableFuture<Void> removeDelayedDeliveryPolicyAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "delayedDelivery");
        return this.asyncDeleteRequest(path);
    }

    @Override
    public void removeDelayedDeliveryPolicy(String topic) throws PulsarAdminException {
        try {
            this.removeDelayedDeliveryPolicyAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> setDelayedDeliveryPolicyAsync(String topic, DelayedDeliveryPolicies delayedDeliveryPolicies) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "delayedDelivery");
        return this.asyncPostRequest(path, Entity.entity(delayedDeliveryPolicies, "application/json"));
    }

    @Override
    public void setDelayedDeliveryPolicy(String topic, DelayedDeliveryPolicies delayedDeliveryPolicies) throws PulsarAdminException {
        try {
            this.setDelayedDeliveryPolicyAsync(topic, delayedDeliveryPolicies).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public Boolean getDeduplicationEnabled(String topic) throws PulsarAdminException {
        try {
            return this.getDeduplicationEnabledAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Boolean> getDeduplicationEnabledAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "deduplicationEnabled");
        final CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        this.asyncGetRequest(path, new InvocationCallback<Boolean>(){

            @Override
            public void completed(Boolean enabled) {
                future.complete(enabled);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public void enableDeduplication(String topic, boolean enabled) throws PulsarAdminException {
        try {
            this.enableDeduplicationAsync(topic, enabled).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> enableDeduplicationAsync(String topic, boolean enabled) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "deduplicationEnabled");
        return this.asyncPostRequest(path, Entity.entity(Boolean.valueOf(enabled), "application/json"));
    }

    @Override
    public void disableDeduplication(String topic) throws PulsarAdminException {
        try {
            this.disableDeduplicationAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> disableDeduplicationAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "deduplicationEnabled");
        return this.asyncDeleteRequest(path);
    }

    @Override
    public OffloadPolicies getOffloadPolicies(String topic) throws PulsarAdminException {
        try {
            return this.getOffloadPoliciesAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<OffloadPolicies> getOffloadPoliciesAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "offloadPolicies");
        final CompletableFuture<OffloadPolicies> future = new CompletableFuture<OffloadPolicies>();
        this.asyncGetRequest(path, new InvocationCallback<OffloadPolicies>(){

            @Override
            public void completed(OffloadPolicies offloadPolicies) {
                future.complete(offloadPolicies);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public void setOffloadPolicies(String topic, OffloadPolicies offloadPolicies) throws PulsarAdminException {
        try {
            this.setOffloadPoliciesAsync(topic, offloadPolicies).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> setOffloadPoliciesAsync(String topic, OffloadPolicies offloadPolicies) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "offloadPolicies");
        return this.asyncPostRequest(path, Entity.entity(offloadPolicies, "application/json"));
    }

    @Override
    public void removeOffloadPolicies(String topic) throws PulsarAdminException {
        try {
            this.removeOffloadPoliciesAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> removeOffloadPoliciesAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "offloadPolicies");
        return this.asyncDeleteRequest(path);
    }

    @Override
    public Integer getMaxUnackedMessagesOnSubscription(String topic) throws PulsarAdminException {
        try {
            return this.getMaxUnackedMessagesOnSubscriptionAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Integer> getMaxUnackedMessagesOnSubscriptionAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "maxUnackedMessagesOnSubscription");
        final CompletableFuture<Integer> future = new CompletableFuture<Integer>();
        this.asyncGetRequest(path, new InvocationCallback<Integer>(){

            @Override
            public void completed(Integer maxNum) {
                future.complete(maxNum);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public void setMaxUnackedMessagesOnSubscription(String topic, int maxNum) throws PulsarAdminException {
        try {
            this.setMaxUnackedMessagesOnSubscriptionAsync(topic, maxNum).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> setMaxUnackedMessagesOnSubscriptionAsync(String topic, int maxNum) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "maxUnackedMessagesOnSubscription");
        return this.asyncPostRequest(path, Entity.entity(Integer.valueOf(maxNum), "application/json"));
    }

    @Override
    public void removeMaxUnackedMessagesOnSubscription(String topic) throws PulsarAdminException {
        try {
            this.removeMaxUnackedMessagesOnSubscriptionAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> removeMaxUnackedMessagesOnSubscriptionAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "maxUnackedMessagesOnSubscription");
        return this.asyncDeleteRequest(path);
    }

    @Override
    public void setMessageTTL(String topic, int messageTTLInSecond) throws PulsarAdminException {
        try {
            TopicName topicName = this.validateTopic(topic);
            WebTarget path = this.topicPath(topicName, "messageTTL");
            this.request(path.queryParam("messageTTL", messageTTLInSecond)).post(Entity.entity("", "application/json"), ErrorData.class);
        }
        catch (Exception e) {
            throw this.getApiException(e);
        }
    }

    @Override
    public int getMessageTTL(String topic) throws PulsarAdminException {
        try {
            TopicName topicName = this.validateTopic(topic);
            WebTarget path = this.topicPath(topicName, "messageTTL");
            return this.request(path).get(new GenericType<Integer>(){});
        }
        catch (Exception e) {
            throw this.getApiException(e);
        }
    }

    @Override
    public void removeMessageTTL(String topic) throws PulsarAdminException {
        try {
            TopicName topicName = this.validateTopic(topic);
            WebTarget path = this.topicPath(topicName, "messageTTL");
            this.request(path.queryParam("messageTTL", 0)).delete(ErrorData.class);
        }
        catch (Exception e) {
            throw this.getApiException(e);
        }
    }

    @Override
    public void setRetention(String topic, RetentionPolicies retention) throws PulsarAdminException {
        try {
            this.setRetentionAsync(topic, retention).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> setRetentionAsync(String topic, RetentionPolicies retention) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "retention");
        return this.asyncPostRequest(path, Entity.entity(retention, "application/json"));
    }

    @Override
    public RetentionPolicies getRetention(String topic) throws PulsarAdminException {
        try {
            return this.getRetentionAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<RetentionPolicies> getRetentionAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "retention");
        final CompletableFuture<RetentionPolicies> future = new CompletableFuture<RetentionPolicies>();
        this.asyncGetRequest(path, new InvocationCallback<RetentionPolicies>(){

            @Override
            public void completed(RetentionPolicies retentionPolicies) {
                future.complete(retentionPolicies);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public void removeRetention(String topic) throws PulsarAdminException {
        try {
            this.removeRetentionAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> removeRetentionAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "retention");
        return this.asyncDeleteRequest(path);
    }

    @Override
    public void setPersistence(String topic, PersistencePolicies persistencePolicies) throws PulsarAdminException {
        try {
            this.setPersistenceAsync(topic, persistencePolicies).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> setPersistenceAsync(String topic, PersistencePolicies persistencePolicies) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "persistence");
        return this.asyncPostRequest(path, Entity.entity(persistencePolicies, "application/json"));
    }

    @Override
    public PersistencePolicies getPersistence(String topic) throws PulsarAdminException {
        try {
            return this.getPersistenceAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<PersistencePolicies> getPersistenceAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "persistence");
        final CompletableFuture<PersistencePolicies> future = new CompletableFuture<PersistencePolicies>();
        this.asyncGetRequest(path, new InvocationCallback<PersistencePolicies>(){

            @Override
            public void completed(PersistencePolicies persistencePolicies) {
                future.complete(persistencePolicies);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public void removePersistence(String topic) throws PulsarAdminException {
        try {
            this.removePersistenceAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> removePersistenceAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "persistence");
        return this.asyncDeleteRequest(path);
    }

    @Override
    public DispatchRate getDispatchRate(String topic) throws PulsarAdminException {
        try {
            return this.getDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<DispatchRate> getDispatchRateAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "dispatchRate");
        final CompletableFuture<DispatchRate> future = new CompletableFuture<DispatchRate>();
        this.asyncGetRequest(path, new InvocationCallback<DispatchRate>(){

            @Override
            public void completed(DispatchRate dispatchRate) {
                future.complete(dispatchRate);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public void setDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException {
        try {
            this.setDispatchRateAsync(topic, dispatchRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> setDispatchRateAsync(String topic, DispatchRate dispatchRate) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "dispatchRate");
        return this.asyncPostRequest(path, Entity.entity(dispatchRate, "application/json"));
    }

    @Override
    public void removeDispatchRate(String topic) throws PulsarAdminException {
        try {
            this.removeDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> removeDispatchRateAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "dispatchRate");
        return this.asyncDeleteRequest(path);
    }

    @Override
    public DispatchRate getSubscriptionDispatchRate(String topic) throws PulsarAdminException {
        try {
            return this.getSubscriptionDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "subscriptionDispatchRate");
        final CompletableFuture<DispatchRate> future = new CompletableFuture<DispatchRate>();
        this.asyncGetRequest(path, new InvocationCallback<DispatchRate>(){

            @Override
            public void completed(DispatchRate dispatchRate) {
                future.complete(dispatchRate);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public void setSubscriptionDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException {
        try {
            this.setSubscriptionDispatchRateAsync(topic, dispatchRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> setSubscriptionDispatchRateAsync(String topic, DispatchRate dispatchRate) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "subscriptionDispatchRate");
        return this.asyncPostRequest(path, Entity.entity(dispatchRate, "application/json"));
    }

    @Override
    public void removeSubscriptionDispatchRate(String topic) throws PulsarAdminException {
        try {
            this.removeSubscriptionDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> removeSubscriptionDispatchRateAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "subscriptionDispatchRate");
        return this.asyncDeleteRequest(path);
    }

    @Override
    public Long getCompactionThreshold(String topic) throws PulsarAdminException {
        try {
            return this.getCompactionThresholdAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Long> getCompactionThresholdAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "compactionThreshold");
        final CompletableFuture<Long> future = new CompletableFuture<Long>();
        this.asyncGetRequest(path, new InvocationCallback<Long>(){

            @Override
            public void completed(Long compactionThreshold) {
                future.complete(compactionThreshold);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public void setCompactionThreshold(String topic, long compactionThreshold) throws PulsarAdminException {
        try {
            this.setCompactionThresholdAsync(topic, compactionThreshold).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> setCompactionThresholdAsync(String topic, long compactionThreshold) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "compactionThreshold");
        return this.asyncPostRequest(path, Entity.entity(Long.valueOf(compactionThreshold), "application/json"));
    }

    @Override
    public void removeCompactionThreshold(String topic) throws PulsarAdminException {
        try {
            this.removeCompactionThresholdAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> removeCompactionThresholdAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "compactionThreshold");
        return this.asyncDeleteRequest(path);
    }

    @Override
    public PublishRate getPublishRate(String topic) throws PulsarAdminException {
        try {
            return this.getPublishRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<PublishRate> getPublishRateAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "publishRate");
        final CompletableFuture<PublishRate> future = new CompletableFuture<PublishRate>();
        this.asyncGetRequest(path, new InvocationCallback<PublishRate>(){

            @Override
            public void completed(PublishRate publishRate) {
                future.complete(publishRate);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public void setPublishRate(String topic, PublishRate publishRate) throws PulsarAdminException {
        try {
            this.setPublishRateAsync(topic, publishRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> setPublishRateAsync(String topic, PublishRate publishRate) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "publishRate");
        return this.asyncPostRequest(path, Entity.entity(publishRate, "application/json"));
    }

    @Override
    public void removePublishRate(String topic) throws PulsarAdminException {
        try {
            this.removePublishRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> removePublishRateAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "publishRate");
        return this.asyncDeleteRequest(path);
    }

    @Override
    public Integer getMaxConsumersPerSubscription(String topic) throws PulsarAdminException {
        try {
            return this.getMaxConsumersPerSubscriptionAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Integer> getMaxConsumersPerSubscriptionAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "maxConsumersPerSubscription");
        final CompletableFuture<Integer> future = new CompletableFuture<Integer>();
        this.asyncGetRequest(path, new InvocationCallback<Integer>(){

            @Override
            public void completed(Integer maxConsumersPerSubscription) {
                future.complete(maxConsumersPerSubscription);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public void setMaxConsumersPerSubscription(String topic, int maxConsumersPerSubscription) throws PulsarAdminException {
        try {
            this.setMaxConsumersPerSubscriptionAsync(topic, maxConsumersPerSubscription).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> setMaxConsumersPerSubscriptionAsync(String topic, int maxConsumersPerSubscription) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "maxConsumersPerSubscription");
        return this.asyncPostRequest(path, Entity.entity(Integer.valueOf(maxConsumersPerSubscription), "application/json"));
    }

    @Override
    public void removeMaxConsumersPerSubscription(String topic) throws PulsarAdminException {
        try {
            this.removeMaxConsumersPerSubscriptionAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> removeMaxConsumersPerSubscriptionAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "maxConsumersPerSubscription");
        return this.asyncDeleteRequest(path);
    }

    @Override
    public Integer getMaxProducers(String topic) throws PulsarAdminException {
        try {
            return this.getMaxProducersAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Integer> getMaxProducersAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "maxProducers");
        final CompletableFuture<Integer> future = new CompletableFuture<Integer>();
        this.asyncGetRequest(path, new InvocationCallback<Integer>(){

            @Override
            public void completed(Integer maxProducers) {
                future.complete(maxProducers);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public void setMaxProducers(String topic, int maxProducers) throws PulsarAdminException {
        try {
            this.setMaxProducersAsync(topic, maxProducers).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> setMaxProducersAsync(String topic, int maxProducers) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "maxProducers");
        return this.asyncPostRequest(path, Entity.entity(Integer.valueOf(maxProducers), "application/json"));
    }

    @Override
    public void removeMaxProducers(String topic) throws PulsarAdminException {
        try {
            this.removeMaxProducersAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> removeMaxProducersAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "maxProducers");
        return this.asyncDeleteRequest(path);
    }

    @Override
    public Integer getMaxSubscriptionsPerTopic(String topic) throws PulsarAdminException {
        try {
            return this.getMaxSubscriptionsPerTopicAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Integer> getMaxSubscriptionsPerTopicAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "maxSubscriptionsPerTopic");
        final CompletableFuture<Integer> future = new CompletableFuture<Integer>();
        this.asyncGetRequest(path, new InvocationCallback<Integer>(){

            @Override
            public void completed(Integer maxSubscriptionsPerTopic) {
                future.complete(maxSubscriptionsPerTopic);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public void setMaxSubscriptionsPerTopic(String topic, int maxSubscriptionsPerTopic) throws PulsarAdminException {
        try {
            this.setMaxSubscriptionsPerTopicAsync(topic, maxSubscriptionsPerTopic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> setMaxSubscriptionsPerTopicAsync(String topic, int maxSubscriptionsPerTopic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "maxSubscriptionsPerTopic");
        return this.asyncPostRequest(path, Entity.entity(Integer.valueOf(maxSubscriptionsPerTopic), "application/json"));
    }

    @Override
    public void removeMaxSubscriptionsPerTopic(String topic) throws PulsarAdminException {
        try {
            this.removeMaxSubscriptionsPerTopicAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> removeMaxSubscriptionsPerTopicAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "maxSubscriptionsPerTopic");
        return this.asyncDeleteRequest(path);
    }

    @Override
    public Integer getMaxMessageSize(String topic) throws PulsarAdminException {
        try {
            return this.getMaxMessageSizeAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Integer> getMaxMessageSizeAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "maxMessageSize");
        final CompletableFuture<Integer> future = new CompletableFuture<Integer>();
        this.asyncGetRequest(path, new InvocationCallback<Integer>(){

            @Override
            public void completed(Integer maxMessageSize) {
                future.complete(maxMessageSize);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public void setMaxMessageSize(String topic, int maxMessageSize) throws PulsarAdminException {
        try {
            this.setMaxMessageSizeAsync(topic, maxMessageSize).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> setMaxMessageSizeAsync(String topic, int maxMessageSize) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "maxMessageSize");
        return this.asyncPostRequest(path, Entity.entity(Integer.valueOf(maxMessageSize), "application/json"));
    }

    @Override
    public void removeMaxMessageSize(String topic) throws PulsarAdminException {
        try {
            this.removeMaxMessageSizeAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> removeMaxMessageSizeAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "maxMessageSize");
        return this.asyncDeleteRequest(path);
    }

    @Override
    public Integer getMaxConsumers(String topic) throws PulsarAdminException {
        try {
            return this.getMaxConsumersAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Integer> getMaxConsumersAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "maxConsumers");
        final CompletableFuture<Integer> future = new CompletableFuture<Integer>();
        this.asyncGetRequest(path, new InvocationCallback<Integer>(){

            @Override
            public void completed(Integer maxProducers) {
                future.complete(maxProducers);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public void setMaxConsumers(String topic, int maxConsumers) throws PulsarAdminException {
        try {
            this.setMaxConsumersAsync(topic, maxConsumers).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> setMaxConsumersAsync(String topic, int maxConsumers) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "maxConsumers");
        return this.asyncPostRequest(path, Entity.entity(Integer.valueOf(maxConsumers), "application/json"));
    }

    @Override
    public void removeMaxConsumers(String topic) throws PulsarAdminException {
        try {
            this.removeMaxConsumersAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> removeMaxConsumersAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "maxConsumers");
        return this.asyncDeleteRequest(path);
    }

    @Override
    public Integer getDeduplicationSnapshotInterval(String topic) throws PulsarAdminException {
        try {
            return this.getDeduplicationSnapshotIntervalAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Integer> getDeduplicationSnapshotIntervalAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "deduplicationSnapshotInterval");
        final CompletableFuture<Integer> future = new CompletableFuture<Integer>();
        this.asyncGetRequest(path, new InvocationCallback<Integer>(){

            @Override
            public void completed(Integer interval) {
                future.complete(interval);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public void setDeduplicationSnapshotInterval(String topic, int interval) throws PulsarAdminException {
        try {
            this.setDeduplicationSnapshotIntervalAsync(topic, interval).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> setDeduplicationSnapshotIntervalAsync(String topic, int interval) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "deduplicationSnapshotInterval");
        return this.asyncPostRequest(path, Entity.entity(Integer.valueOf(interval), "application/json"));
    }

    @Override
    public void removeDeduplicationSnapshotInterval(String topic) throws PulsarAdminException {
        try {
            this.removeDeduplicationSnapshotIntervalAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> removeDeduplicationSnapshotIntervalAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "deduplicationSnapshotInterval");
        return this.asyncDeleteRequest(path);
    }

    @Override
    public SubscribeRate getSubscribeRate(String topic) throws PulsarAdminException {
        try {
            return this.getSubscribeRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<SubscribeRate> getSubscribeRateAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "subscribeRate");
        final CompletableFuture<SubscribeRate> future = new CompletableFuture<SubscribeRate>();
        this.asyncGetRequest(path, new InvocationCallback<SubscribeRate>(){

            @Override
            public void completed(SubscribeRate subscribeRate) {
                future.complete(subscribeRate);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public void setSubscribeRate(String topic, SubscribeRate subscribeRate) throws PulsarAdminException {
        try {
            this.setSubscribeRateAsync(topic, subscribeRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> setSubscribeRateAsync(String topic, SubscribeRate subscribeRate) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "subscribeRate");
        return this.asyncPostRequest(path, Entity.entity(subscribeRate, "application/json"));
    }

    @Override
    public void removeSubscribeRate(String topic) throws PulsarAdminException {
        try {
            this.removeSubscribeRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> removeSubscribeRateAsync(String topic) {
        TopicName topicName = this.validateTopic(topic);
        WebTarget path = this.topicPath(topicName, "subscribeRate");
        return this.asyncDeleteRequest(path);
    }
}

