/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.cluster.protocol.impl;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
import org.apache.nifi.cluster.protocol.message.OffloadMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
import org.apache.nifi.io.socket.SocketConfiguration;
import org.apache.nifi.io.socket.SocketUtils;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandardClusterCoordinationProtocolSender
implements ClusterCoordinationProtocolSender {
    private static final Logger logger = LoggerFactory.getLogger(StandardClusterCoordinationProtocolSender.class);
    private final ProtocolContext<ProtocolMessage> protocolContext;
    private final SocketConfiguration socketConfiguration;
    private final int maxThreadsPerRequest;
    private int handshakeTimeoutSeconds;

    public StandardClusterCoordinationProtocolSender(SocketConfiguration socketConfiguration, ProtocolContext<ProtocolMessage> protocolContext, int maxThreadsPerRequest) {
        if (socketConfiguration == null) {
            throw new IllegalArgumentException("Socket configuration may not be null.");
        }
        if (protocolContext == null) {
            throw new IllegalArgumentException("Protocol Context may not be null.");
        }
        this.socketConfiguration = socketConfiguration;
        this.protocolContext = protocolContext;
        this.handshakeTimeoutSeconds = -1;
        this.maxThreadsPerRequest = maxThreadsPerRequest;
    }

    @Override
    public void setBulletinRepository(BulletinRepository bulletinRepository) {
    }

    @Override
    public ReconnectionResponseMessage requestReconnection(ReconnectionRequestMessage msg) throws ProtocolException {
        ProtocolMessage response;
        block7: {
            ReconnectionResponseMessage reconnectionResponseMessage;
            Socket socket = null;
            try {
                socket = this.createSocket(msg.getNodeId(), true);
                try {
                    ProtocolMessageMarshaller<ProtocolMessage> marshaller = this.protocolContext.createMarshaller();
                    marshaller.marshal(msg, socket.getOutputStream());
                }
                catch (IOException ioe) {
                    throw new ProtocolException("Failed marshalling '" + (Object)((Object)msg.getType()) + "' protocol message due to: " + ioe, ioe);
                }
                try {
                    ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = this.protocolContext.createUnmarshaller();
                    response = unmarshaller.unmarshal(socket.getInputStream());
                }
                catch (IOException ioe) {
                    throw new ProtocolException("Failed unmarshalling '" + (Object)((Object)ProtocolMessage.MessageType.RECONNECTION_RESPONSE) + "' protocol message due to: " + ioe, ioe);
                }
                if (ProtocolMessage.MessageType.RECONNECTION_RESPONSE != response.getType()) break block7;
                reconnectionResponseMessage = (ReconnectionResponseMessage)response;
            }
            catch (Throwable throwable) {
                SocketUtils.closeQuietly(socket);
                throw throwable;
            }
            SocketUtils.closeQuietly((Socket)socket);
            return reconnectionResponseMessage;
        }
        throw new ProtocolException("Expected message type '" + (Object)((Object)ProtocolMessage.MessageType.FLOW_RESPONSE) + "' but found '" + (Object)((Object)response.getType()) + "'");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void offload(OffloadMessage msg) throws ProtocolException {
        Socket socket = null;
        try {
            socket = this.createSocket(msg.getNodeId(), true);
            try {
                ProtocolMessageMarshaller<ProtocolMessage> marshaller = this.protocolContext.createMarshaller();
                marshaller.marshal(msg, socket.getOutputStream());
            }
            catch (IOException ioe) {
                throw new ProtocolException("Failed marshalling '" + (Object)((Object)msg.getType()) + "' protocol message due to: " + ioe, ioe);
            }
        }
        catch (Throwable throwable) {
            SocketUtils.closeQuietly(socket);
            throw throwable;
        }
        SocketUtils.closeQuietly((Socket)socket);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void disconnect(DisconnectMessage msg) throws ProtocolException {
        Socket socket = null;
        try {
            socket = this.createSocket(msg.getNodeId(), true);
            try {
                ProtocolMessageMarshaller<ProtocolMessage> marshaller = this.protocolContext.createMarshaller();
                marshaller.marshal(msg, socket.getOutputStream());
            }
            catch (IOException ioe) {
                throw new ProtocolException("Failed marshalling '" + (Object)((Object)msg.getType()) + "' protocol message due to: " + ioe, ioe);
            }
        }
        catch (Throwable throwable) {
            SocketUtils.closeQuietly(socket);
            throw throwable;
        }
        SocketUtils.closeQuietly((Socket)socket);
    }

    private void setConnectionHandshakeTimeoutOnSocket(Socket socket) throws SocketException {
        if (this.handshakeTimeoutSeconds >= 0) {
            socket.setSoTimeout(this.handshakeTimeoutSeconds * 1000);
        }
    }

    public SocketConfiguration getSocketConfiguration() {
        return this.socketConfiguration;
    }

    public int getHandshakeTimeoutSeconds() {
        return this.handshakeTimeoutSeconds;
    }

    public void setHandshakeTimeout(String handshakeTimeout) {
        this.handshakeTimeoutSeconds = (int)FormatUtils.getTimeDuration((String)handshakeTimeout, (TimeUnit)TimeUnit.SECONDS);
    }

    private Socket createSocket(NodeIdentifier nodeId, boolean applyHandshakeTimeout) {
        return this.createSocket(nodeId.getSocketAddress(), nodeId.getSocketPort(), applyHandshakeTimeout);
    }

    private Socket createSocket(String host, int port, boolean applyHandshakeTimeout) {
        try {
            Socket socket = SocketUtils.createSocket((InetSocketAddress)InetSocketAddress.createUnresolved(host, port), (SocketConfiguration)this.socketConfiguration);
            if (applyHandshakeTimeout) {
                this.setConnectionHandshakeTimeoutOnSocket(socket);
            }
            return socket;
        }
        catch (IOException ioe) {
            throw new ProtocolException("Failed to create socket due to: " + ioe, ioe);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public NodeConnectionStatus requestNodeConnectionStatus(String hostname, int port) {
        byte[] msgBytes;
        Throwable throwable;
        Objects.requireNonNull(hostname);
        NodeConnectionStatusRequestMessage msg = new NodeConnectionStatusRequestMessage();
        try {
            throwable = null;
            try (ByteArrayOutputStream baos = new ByteArrayOutputStream();){
                ProtocolMessageMarshaller<ProtocolMessage> marshaller = this.protocolContext.createMarshaller();
                marshaller.marshal(msg, baos);
                msgBytes = baos.toByteArray();
            }
            catch (Throwable marshaller) {
                throwable = marshaller;
                throw marshaller;
            }
        }
        catch (IOException e) {
            throw new ProtocolException("Failed to marshal NodeIdentifierRequestMessage", e);
        }
        try {
            throwable = null;
            try (Socket socket = this.createSocket(hostname, port, true);){
                ProtocolMessage response;
                socket.getOutputStream().write(msgBytes);
                try {
                    ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = this.protocolContext.createUnmarshaller();
                    response = unmarshaller.unmarshal(socket.getInputStream());
                }
                catch (IOException ioe) {
                    throw new ProtocolException("Failed unmarshalling '" + (Object)((Object)ProtocolMessage.MessageType.RECONNECTION_RESPONSE) + "' protocol message due to: " + ioe, ioe);
                }
                if (ProtocolMessage.MessageType.NODE_CONNECTION_STATUS_RESPONSE == response.getType()) {
                    NodeConnectionStatus nodeConnectionStatus = ((NodeConnectionStatusResponseMessage)response).getNodeConnectionStatus();
                    return nodeConnectionStatus;
                }
                throw new ProtocolException("Expected message type '" + (Object)((Object)ProtocolMessage.MessageType.NODE_CONNECTION_STATUS_RESPONSE) + "' but found '" + (Object)((Object)response.getType()) + "'");
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
        }
        catch (IOException ioe) {
            throw new ProtocolException("Failed to request Node Identifer from " + hostname + ":" + port, ioe);
        }
    }

    @Override
    public void notifyNodeStatusChange(Set<NodeIdentifier> nodesToNotify, final NodeStatusChangeMessage msg) {
        byte[] msgBytes;
        if (nodesToNotify.isEmpty()) {
            return;
        }
        int numThreads = Math.min(nodesToNotify.size(), this.maxThreadsPerRequest);
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            Object object = null;
            try {
                ProtocolMessageMarshaller<ProtocolMessage> marshaller = this.protocolContext.createMarshaller();
                marshaller.marshal(msg, baos);
                msgBytes = baos.toByteArray();
            }
            catch (Throwable marshaller) {
                object = marshaller;
                throw marshaller;
            }
            finally {
                if (baos != null) {
                    if (object != null) {
                        try {
                            baos.close();
                        }
                        catch (Throwable marshaller) {
                            ((Throwable)object).addSuppressed(marshaller);
                        }
                    } else {
                        baos.close();
                    }
                }
            }
        }
        catch (IOException e) {
            throw new ProtocolException("Failed to marshal NodeStatusChangeMessage", e);
        }
        ExecutorService executor = Executors.newFixedThreadPool(numThreads, new ThreadFactory(){
            private final AtomicInteger counter = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = Executors.defaultThreadFactory().newThread(r);
                thread.setDaemon(true);
                thread.setName("Notify Cluster of Node Status Change-" + this.counter.incrementAndGet());
                return thread;
            }
        });
        for (final NodeIdentifier nodeId : nodesToNotify) {
            executor.submit(new Runnable(){

                @Override
                public void run() {
                    try (Socket socket = StandardClusterCoordinationProtocolSender.this.createSocket(nodeId, true);){
                        socket.getOutputStream().write(msgBytes);
                    }
                    catch (IOException ioe) {
                        throw new ProtocolException("Failed to send Node Status Change message to " + nodeId, ioe);
                    }
                    logger.debug("Notified {} of status change {}", (Object)nodeId, (Object)msg);
                }
            });
        }
        executor.shutdown();
        try {
            executor.awaitTermination(10L, TimeUnit.DAYS);
        }
        catch (InterruptedException ie) {
            throw new ProtocolException(ie);
        }
    }
}

