/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.impl.HttpClient;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
import org.apache.pulsar.client.impl.schema.SchemaUtils;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpLookupService
implements LookupService {
    private final HttpClient httpClient;
    private final boolean useTls;
    private final String listenerName;
    private static final String BasePathV1 = "lookup/v2/destination/";
    private static final String BasePathV2 = "lookup/v2/topic/";
    private static final Logger log = LoggerFactory.getLogger(HttpLookupService.class);

    public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
        this.httpClient = new HttpClient(conf, eventLoopGroup);
        this.useTls = conf.isUseTls();
        this.listenerName = conf.getListenerName();
    }

    @Override
    public void updateServiceUrl(String serviceUrl) throws PulsarClientException {
        this.httpClient.setServiceUrl(serviceUrl);
    }

    @Override
    public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(TopicName topicName) {
        String basePath = topicName.isV2() ? BasePathV2 : BasePathV1;
        String path = basePath + topicName.getLookupName();
        path = StringUtils.isBlank(this.listenerName) ? path : path + "?listenerName=" + Codec.encode(this.listenerName);
        return this.httpClient.get(path, LookupData.class).thenCompose(lookupData -> {
            URI uri = null;
            try {
                if (this.useTls) {
                    uri = new URI(lookupData.getBrokerUrlTls());
                } else {
                    String serviceUrl = lookupData.getBrokerUrl();
                    if (serviceUrl == null) {
                        serviceUrl = lookupData.getNativeUrl();
                    }
                    uri = new URI(serviceUrl);
                }
                InetSocketAddress brokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
                return CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress));
            }
            catch (Exception e) {
                log.warn("[{}] Lookup Failed due to invalid url {}, {}", new Object[]{topicName, uri, e.getMessage()});
                return FutureUtil.failedFuture(e);
            }
        });
    }

    @Override
    public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName) {
        String format = topicName.isV2() ? "admin/v2/%s/partitions" : "admin/%s/partitions";
        return this.httpClient.get(String.format(format, topicName.getLookupName()) + "?checkAllowAutoCreation=true", PartitionedTopicMetadata.class);
    }

    @Override
    public String getServiceUrl() {
        return this.httpClient.getServiceUrl();
    }

    @Override
    public CompletableFuture<List<String>> getTopicsUnderNamespace(NamespaceName namespace, CommandGetTopicsOfNamespace.Mode mode) {
        CompletableFuture<List<String>> future = new CompletableFuture<List<String>>();
        String format = namespace.isV2() ? "admin/v2/namespaces/%s/topics?mode=%s" : "admin/namespaces/%s/destinations?mode=%s";
        ((CompletableFuture)this.httpClient.get(String.format(format, namespace, mode.toString()), String[].class).thenAccept(topics -> {
            ArrayList result = Lists.newArrayList();
            Arrays.asList(topics).forEach(topic -> {
                String filtered = TopicName.get(topic).getPartitionedTopicName();
                if (!result.contains(filtered)) {
                    result.add(filtered);
                }
            });
            future.complete(result);
        })).exceptionally(ex -> {
            log.warn("Failed to getTopicsUnderNamespace namespace {} {}.", (Object)namespace, (Object)ex.getMessage());
            future.completeExceptionally((Throwable)ex);
            return null;
        });
        return future;
    }

    @Override
    public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName) {
        return this.getSchema(topicName, null);
    }

    @Override
    public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, byte[] version) {
        CompletableFuture<Optional<SchemaInfo>> future = new CompletableFuture<Optional<SchemaInfo>>();
        String schemaName = topicName.getSchemaName();
        String path = String.format("admin/v2/schemas/%s/schema", schemaName);
        if (version != null) {
            if (version.length == 0) {
                future.completeExceptionally(new SchemaSerializationException("Empty schema version"));
                return future;
            }
            path = String.format("admin/v2/schemas/%s/schema/%s", schemaName, ByteBuffer.wrap(version).getLong());
        }
        ((CompletableFuture)this.httpClient.get(path, GetSchemaResponse.class).thenAccept(response -> {
            if (response.getType() == SchemaType.KEY_VALUE) {
                try {
                    SchemaData data = SchemaData.builder().data(SchemaUtils.convertKeyValueDataStringToSchemaInfoSchema(response.getData().getBytes(StandardCharsets.UTF_8))).type(response.getType()).props(response.getProperties()).build();
                    future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, data)));
                }
                catch (IOException err) {
                    future.completeExceptionally(err);
                }
            } else {
                future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, response)));
            }
        })).exceptionally(ex -> {
            if (ex.getCause() instanceof PulsarClientException.NotFoundException) {
                future.complete(Optional.empty());
            } else {
                log.warn("Failed to get schema for topic {} version {}", new Object[]{topicName, version != null ? Base64.getEncoder().encodeToString(version) : null, ex.getCause()});
                future.completeExceptionally((Throwable)ex);
            }
            return null;
        });
        return future;
    }

    @Override
    public void close() throws Exception {
        this.httpClient.close();
    }
}

