/*
 * Decompiled with CFR 0.152.
 */
package org.apache.servicecomb.saga.omega.connector.grpc;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.servicecomb.saga.omega.connector.grpc.GrpcClientMessageSender;
import org.apache.servicecomb.saga.omega.connector.grpc.PushBackReconnectRunnable;
import org.apache.servicecomb.saga.omega.connector.grpc.RetryableMessageSender;
import org.apache.servicecomb.saga.omega.context.ServiceConfig;
import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
import org.apache.servicecomb.saga.omega.transaction.OmegaException;
import org.apache.servicecomb.saga.omega.transaction.TxEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoadBalancedClusterMessageSender
implements MessageSender {
    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final Map<MessageSender, Long> senders = new ConcurrentHashMap<MessageSender, Long>();
    private final Collection<ManagedChannel> channels;
    private final BlockingQueue<Runnable> pendingTasks = new LinkedBlockingQueue<Runnable>();
    private final BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<MessageSender>();
    private final MessageSender retryableMessageSender = new RetryableMessageSender(this.availableMessageSenders);
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

    public LoadBalancedClusterMessageSender(String[] addresses, MessageSerializer serializer, MessageDeserializer deserializer, ServiceConfig serviceConfig, MessageHandler handler, int reconnectDelay) {
        if (addresses.length == 0) {
            throw new IllegalArgumentException("No reachable cluster address provided");
        }
        this.channels = new ArrayList<ManagedChannel>(addresses.length);
        for (String address : addresses) {
            ManagedChannel channel = ManagedChannelBuilder.forTarget((String)address).usePlaintext(true).build();
            this.channels.add(channel);
            this.senders.put(new GrpcClientMessageSender(address, channel, serializer, deserializer, serviceConfig, this.errorHandlerFactory(), handler), 0L);
        }
        this.scheduleReconnectTask(reconnectDelay);
    }

    LoadBalancedClusterMessageSender(MessageSender ... messageSenders) {
        for (MessageSender sender : messageSenders) {
            this.senders.put(sender, 0L);
        }
        this.channels = Collections.emptyList();
    }

    public void onConnected() {
        this.senders.keySet().forEach(sender -> {
            try {
                sender.onConnected();
            }
            catch (Exception e) {
                log.error("Failed connecting to alpha at {}", (Object)sender.target(), (Object)e);
            }
        });
    }

    public void onDisconnected() {
        this.senders.keySet().forEach(sender -> {
            try {
                sender.onDisconnected();
            }
            catch (Exception e) {
                log.error("Failed disconnecting from alpha at {}", (Object)sender.target(), (Object)e);
            }
        });
    }

    public void close() {
        this.scheduler.shutdown();
        this.channels.forEach(ManagedChannel::shutdownNow);
    }

    public AlphaResponse send(TxEvent event) {
        while (true) {
            MessageSender messageSender = this.fastestSender();
            try {
                long startTime = System.nanoTime();
                AlphaResponse response = messageSender.send(event);
                this.senders.put(messageSender, System.nanoTime() - startTime);
                return response;
            }
            catch (OmegaException e) {
                throw e;
            }
            catch (Exception e) {
                log.error("Retry sending event {} due to failure", (Object)event, (Object)e);
                this.senders.put(messageSender, Long.MAX_VALUE);
                if (!Thread.currentThread().isInterrupted()) continue;
                throw new OmegaException("Failed to send event " + event + " due to interruption");
            }
            break;
        }
    }

    private MessageSender fastestSender() {
        return this.senders.entrySet().stream().filter(entry -> (Long)entry.getValue() < Long.MAX_VALUE).min(Comparator.comparingLong(Map.Entry::getValue)).map(Map.Entry::getKey).orElse(this.retryableMessageSender);
    }

    private void scheduleReconnectTask(int reconnectDelay) {
        this.scheduler.scheduleWithFixedDelay(() -> {
            try {
                this.pendingTasks.take().run();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, 0L, reconnectDelay, TimeUnit.MILLISECONDS);
    }

    private Function<MessageSender, Runnable> errorHandlerFactory() {
        return messageSender -> {
            PushBackReconnectRunnable runnable = new PushBackReconnectRunnable((MessageSender)messageSender, this.senders, this.pendingTasks, this.availableMessageSenders);
            return () -> this.pendingTasks.offer(runnable);
        };
    }
}

