/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.action.search;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.search.RemoteClusterConnection;
import org.elasticsearch.action.search.RemoteConnectionInfo;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchShardIterator;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;

public final class RemoteClusterService
extends AbstractComponent
implements Closeable {
    static final String LOCAL_CLUSTER_GROUP_KEY = "";
    public static final Setting.AffixSetting<List<InetSocketAddress>> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting("search.remote.", "seeds", key -> Setting.listSetting(key, Collections.emptyList(), RemoteClusterService::parseSeedAddress, Setting.Property.NodeScope, Setting.Property.Dynamic));
    public static final Setting<Integer> REMOTE_CONNECTIONS_PER_CLUSTER = Setting.intSetting("search.remote.connections_per_cluster", 3, 1, Setting.Property.NodeScope);
    public static final Setting<TimeValue> REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING = Setting.positiveTimeSetting("search.remote.initial_connect_timeout", TimeValue.timeValueSeconds(30L), Setting.Property.NodeScope);
    public static final Setting<String> REMOTE_NODE_ATTRIBUTE = Setting.simpleString("search.remote.node.attr", Setting.Property.NodeScope);
    public static final Setting<Boolean> ENABLE_REMOTE_CLUSTERS = Setting.boolSetting("search.remote.connect", true, Setting.Property.NodeScope);
    private static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
    private final TransportService transportService;
    private final int numRemoteConnections;
    private final ClusterNameExpressionResolver clusterNameResolver;
    private volatile Map<String, RemoteClusterConnection> remoteClusters = Collections.emptyMap();

    RemoteClusterService(Settings settings, TransportService transportService) {
        super(settings);
        this.transportService = transportService;
        this.clusterNameResolver = new ClusterNameExpressionResolver(settings);
        this.numRemoteConnections = REMOTE_CONNECTIONS_PER_CLUSTER.get(settings);
    }

    private synchronized void updateRemoteClusters(Map<String, List<DiscoveryNode>> seeds, ActionListener<Void> connectionListener) {
        if (seeds.containsKey(LOCAL_CLUSTER_GROUP_KEY)) {
            throw new IllegalArgumentException("remote clusters must not have the empty string as its key");
        }
        HashMap<String, RemoteClusterConnection> remoteClusters = new HashMap<String, RemoteClusterConnection>();
        if (seeds.isEmpty()) {
            connectionListener.onResponse(null);
        } else {
            CountDown countDown = new CountDown(seeds.size());
            Predicate<DiscoveryNode> nodePredicate = node -> Version.CURRENT.isCompatible(node.getVersion()) && node.getVersion().onOrAfter(Version.V_5_3_0_UNRELEASED);
            if (REMOTE_NODE_ATTRIBUTE.exists(this.settings)) {
                String attribute = REMOTE_NODE_ATTRIBUTE.get(this.settings);
                nodePredicate = nodePredicate.and(node -> {
                    String value = node.getAttributes().getOrDefault(attribute, "false");
                    boolean booleanValue = Booleans.parseBooleanExact(value);
                    if (!Booleans.isStrictlyBoolean(value)) {
                        DeprecationLogger deprecationLogger = new DeprecationLogger(this.logger);
                        deprecationLogger.deprecated("Expected a boolean [true/false] for setting [{}] but got [{}]", attribute, value);
                    }
                    return booleanValue;
                });
            }
            remoteClusters.putAll(this.remoteClusters);
            for (Map.Entry<String, List<DiscoveryNode>> entry : seeds.entrySet()) {
                RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey());
                if (entry.getValue().isEmpty()) {
                    try {
                        IOUtils.close(remote);
                    }
                    catch (IOException e) {
                        this.logger.warn("failed to close remote cluster connections for cluster: " + entry.getKey(), (Throwable)e);
                    }
                    remoteClusters.remove(entry.getKey());
                    continue;
                }
                if (remote == null) {
                    remote = new RemoteClusterConnection(this.settings, entry.getKey(), entry.getValue(), this.transportService, this.numRemoteConnections, nodePredicate);
                    remoteClusters.put(entry.getKey(), remote);
                }
                RemoteClusterConnection finalRemote = remote;
                remote.updateSeedNodes(entry.getValue(), ActionListener.wrap(response -> {
                    if (countDown.countDown()) {
                        connectionListener.onResponse((Void)response);
                    }
                }, exception -> {
                    if (countDown.fastForward()) {
                        connectionListener.onFailure((Exception)exception);
                    }
                    if (!finalRemote.isClosed()) {
                        this.logger.warn("failed to update seed list for cluster: " + (String)entry.getKey(), (Throwable)exception);
                    }
                }));
            }
        }
        this.remoteClusters = Collections.unmodifiableMap(remoteClusters);
    }

    boolean isCrossClusterSearchEnabled() {
        return !this.remoteClusters.isEmpty();
    }

    boolean isRemoteNodeConnected(String remoteCluster, DiscoveryNode node) {
        return this.remoteClusters.get(remoteCluster).isNodeConnected(node);
    }

    Map<String, List<String>> groupClusterIndices(String[] requestIndices, Predicate<String> indexExists) {
        HashMap<String, List<String>> perClusterIndices = new HashMap<String, List<String>>();
        Set<String> remoteClusterNames = this.remoteClusters.keySet();
        for (String index : requestIndices) {
            int i = index.indexOf(58);
            if (i >= 0) {
                String remoteClusterName = index.substring(0, i);
                List<String> clusters = this.clusterNameResolver.resolveClusterNames(remoteClusterNames, remoteClusterName);
                if (!clusters.isEmpty()) {
                    if (indexExists.test(index)) {
                        throw new IllegalArgumentException("Can not filter indices; index " + index + " exists but there is also a remote cluster named: " + remoteClusterName);
                    }
                    String indexName = index.substring(i + 1);
                    for (String clusterName : clusters) {
                        perClusterIndices.computeIfAbsent(clusterName, k -> new ArrayList()).add(indexName);
                    }
                    continue;
                }
                perClusterIndices.computeIfAbsent(LOCAL_CLUSTER_GROUP_KEY, k -> new ArrayList()).add(index);
                continue;
            }
            perClusterIndices.computeIfAbsent(LOCAL_CLUSTER_GROUP_KEY, k -> new ArrayList()).add(index);
        }
        return perClusterIndices;
    }

    boolean isRemoteClusterRegistered(String clusterName) {
        return this.remoteClusters.containsKey(clusterName);
    }

    void collectSearchShards(SearchRequest searchRequest, Map<String, OriginalIndices> remoteIndicesByCluster, final ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
        final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
        final ConcurrentHashMap searchShardsResponses = new ConcurrentHashMap();
        final AtomicReference transportException = new AtomicReference();
        for (Map.Entry<String, OriginalIndices> entry : remoteIndicesByCluster.entrySet()) {
            final String clusterName = entry.getKey();
            RemoteClusterConnection remoteClusterConnection = this.remoteClusters.get(clusterName);
            if (remoteClusterConnection == null) {
                throw new IllegalArgumentException("no such remote cluster: " + clusterName);
            }
            String[] indices = entry.getValue().indices();
            remoteClusterConnection.fetchSearchShards(searchRequest, indices, new ActionListener<ClusterSearchShardsResponse>(){

                @Override
                public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
                    searchShardsResponses.put(clusterName, clusterSearchShardsResponse);
                    if (responsesCountDown.countDown()) {
                        TransportException exception = (TransportException)transportException.get();
                        if (exception == null) {
                            listener.onResponse(searchShardsResponses);
                        } else {
                            listener.onFailure((Exception)transportException.get());
                        }
                    }
                }

                @Override
                public void onFailure(Exception e) {
                    TransportException exception = new TransportException("unable to communicate with remote cluster [" + clusterName + "]", e);
                    if (!transportException.compareAndSet(null, exception)) {
                        exception = transportException.accumulateAndGet(exception, (previous, current) -> {
                            current.addSuppressed((Throwable)previous);
                            return current;
                        });
                    }
                    if (responsesCountDown.countDown()) {
                        listener.onFailure(exception);
                    }
                }
            });
        }
    }

    Function<String, Transport.Connection> processRemoteShards(Map<String, ClusterSearchShardsResponse> searchShardsResponses, Map<String, OriginalIndices> remoteIndicesByCluster, List<SearchShardIterator> remoteShardIterators, Map<String, AliasFilter> aliasFilterMap) {
        HashMap<String, Supplier> nodeToCluster = new HashMap<String, Supplier>();
        for (Map.Entry<String, ClusterSearchShardsResponse> entry : searchShardsResponses.entrySet()) {
            String clusterAlias = entry.getKey();
            ClusterSearchShardsResponse searchShardsResponse = entry.getValue();
            for (DiscoveryNode remoteNode : searchShardsResponse.getNodes()) {
                nodeToCluster.put(remoteNode.getId(), () -> this.getConnection(remoteNode, clusterAlias));
            }
            Map<String, AliasFilter> indicesAndFilters = searchShardsResponse.getIndicesAndFilters();
            for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
                AliasFilter aliasFilter;
                ShardId shardId = clusterSearchShardsGroup.getShardId();
                Index remoteIndex = shardId.getIndex();
                Index index = new Index(clusterAlias + ':' + remoteIndex.getName(), remoteIndex.getUUID());
                OriginalIndices originalIndices = remoteIndicesByCluster.get(clusterAlias);
                assert (originalIndices != null);
                SearchShardIterator shardIterator = new SearchShardIterator(new ShardId(index, shardId.getId()), Arrays.asList(clusterSearchShardsGroup.getShards()), originalIndices);
                remoteShardIterators.add(shardIterator);
                if (indicesAndFilters == null) {
                    aliasFilter = new AliasFilter(null, Strings.EMPTY_ARRAY);
                } else {
                    aliasFilter = indicesAndFilters.get(shardId.getIndexName());
                    assert (aliasFilter != null);
                }
                aliasFilterMap.put(remoteIndex.getUUID(), aliasFilter);
            }
        }
        return nodeId -> {
            Supplier supplier = (Supplier)nodeToCluster.get(nodeId);
            if (supplier == null) {
                throw new IllegalArgumentException("unknown remote node: " + nodeId);
            }
            return (Transport.Connection)supplier.get();
        };
    }

    private Transport.Connection getConnection(DiscoveryNode node, String cluster) {
        RemoteClusterConnection connection = this.remoteClusters.get(cluster);
        if (connection == null) {
            throw new IllegalArgumentException("no such remote cluster: " + cluster);
        }
        return connection.getConnection(node);
    }

    void updateRemoteCluster(String clusterAlias, List<InetSocketAddress> addresses) {
        this.updateRemoteCluster(clusterAlias, addresses, ActionListener.wrap(x -> {}, x -> {}));
    }

    void updateRemoteCluster(String clusterAlias, List<InetSocketAddress> addresses, ActionListener<Void> connectionListener) {
        List nodes = addresses.stream().map(address -> {
            InetSocketTransportAddress transportAddress = new InetSocketTransportAddress((InetSocketAddress)address);
            String id = clusterAlias + "#" + transportAddress.toString();
            Version version2 = Version.CURRENT.minimumCompatibilityVersion();
            return new DiscoveryNode(id, transportAddress, version2);
        }).collect(Collectors.toList());
        this.updateRemoteClusters(Collections.singletonMap(clusterAlias, nodes), connectionListener);
    }

    static Map<String, List<DiscoveryNode>> buildRemoteClustersSeeds(Settings settings) {
        Stream<Setting<List<InetSocketAddress>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
        return allConcreteSettings.collect(Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> {
            String clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace((Setting<List<InetSocketAddress>>)concreteSetting);
            ArrayList<DiscoveryNode> nodes = new ArrayList<DiscoveryNode>();
            for (InetSocketAddress address : (List)concreteSetting.get(settings)) {
                InetSocketTransportAddress transportAddress = new InetSocketTransportAddress(address);
                DiscoveryNode node = new DiscoveryNode(clusterName + "#" + transportAddress.toString(), transportAddress, Version.CURRENT.minimumCompatibilityVersion());
                nodes.add(node);
            }
            return nodes;
        }));
    }

    private static InetSocketAddress parseSeedAddress(String remoteHost) {
        InetAddress hostAddress;
        int portSeparator = remoteHost.lastIndexOf(58);
        if (portSeparator == -1 || portSeparator == remoteHost.length()) {
            throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead");
        }
        String host = remoteHost.substring(0, portSeparator);
        try {
            hostAddress = InetAddress.getByName(host);
        }
        catch (UnknownHostException e) {
            throw new IllegalArgumentException("unknown host [" + host + "]", e);
        }
        try {
            int port = Integer.valueOf(remoteHost.substring(portSeparator + 1));
            if (port <= 0) {
                throw new IllegalArgumentException("port number must be > 0 but was: [" + port + "]");
            }
            return new InetSocketAddress(hostAddress, port);
        }
        catch (NumberFormatException e) {
            throw new IllegalArgumentException("port must be a number", e);
        }
    }

    void initializeRemoteClusters() {
        TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(this.settings);
        PlainActionFuture<Void> future = new PlainActionFuture<Void>();
        Map<String, List<DiscoveryNode>> seeds = RemoteClusterService.buildRemoteClustersSeeds(this.settings);
        this.updateRemoteClusters(seeds, future);
        try {
            future.get(timeValue.millis(), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (TimeoutException ex) {
            this.logger.warn("failed to connect to remote clusters within {}", (Object)timeValue.toString());
        }
        catch (Exception e) {
            throw new IllegalStateException("failed to connect to remote clusters", e);
        }
    }

    @Override
    public void close() throws IOException {
        IOUtils.close(this.remoteClusters.values());
    }

    public void getRemoteConnectionInfos(ActionListener<Collection<RemoteConnectionInfo>> listener) {
        Map<String, RemoteClusterConnection> remoteClusters = this.remoteClusters;
        if (remoteClusters.isEmpty()) {
            listener.onResponse(Collections.emptyList());
        } else {
            GroupedActionListener<RemoteConnectionInfo> actionListener = new GroupedActionListener<RemoteConnectionInfo>(listener, remoteClusters.size(), Collections.emptyList());
            for (RemoteClusterConnection connection : remoteClusters.values()) {
                connection.getConnectionInfo(actionListener);
            }
        }
    }
}

