package com.nokia.dempsy.messagetransport.tcp;

import com.nokia.dempsy.messagetransport.MessageTransportException;
import com.nokia.dempsy.messagetransport.Sender;
import com.nokia.dempsy.monitoring.StatsCollector;
import com.nokia.dempsy.util.SocketTimeout;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/nokia/dempsy/messagetransport/tcp/TcpSender.class */
public class TcpSender implements Sender {
    private static Logger logger = LoggerFactory.getLogger(TcpSender.class);
    private TcpDestination destination;
    private StatsCollector statsCollector;
    private long timeoutMillis;
    private long maxNumberOfQueuedMessages;
    private boolean batchOutgoingMessages;
    private Socket socket = null;
    private IsLocalAddress isLocalAddress = IsLocalAddress.Unknown;
    private AtomicReference<Thread> senderThread = new AtomicReference<>();
    private AtomicBoolean isSenderRunning = new AtomicBoolean(false);
    private AtomicBoolean senderKeepRunning = new AtomicBoolean(false);
    protected SocketTimeout socketTimeout = null;
    protected BlockingQueue<byte[]> sendingQueue = new LinkedBlockingQueue();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/nokia/dempsy/messagetransport/tcp/TcpSender$IsLocalAddress.class */
    public enum IsLocalAddress {
        Yes,
        No,
        Unknown
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TcpSender(TcpDestination tcpDestination, StatsCollector statsCollector, long j, long j2, boolean z) throws MessageTransportException {
        this.statsCollector = null;
        this.destination = tcpDestination;
        this.statsCollector = statsCollector;
        this.timeoutMillis = j2;
        this.batchOutgoingMessages = z;
        this.maxNumberOfQueuedMessages = j;
        if (this.statsCollector != null) {
            this.statsCollector.setMessagesOutPendingGauge(new StatsCollector.Gauge() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpSender.1
                public long value() {
                    return TcpSender.this.sendingQueue.size();
                }
            });
        }
        start();
    }

    public void setTimeoutMillis(long j) {
        this.timeoutMillis = j;
    }

    public void setMaxNumberOfQueuedMessages(long j) {
        this.maxNumberOfQueuedMessages = j;
    }

    public void send(byte[] bArr) throws MessageTransportException {
        try {
            this.sendingQueue.put(bArr);
        } catch (InterruptedException e) {
            if (this.statsCollector != null) {
                this.statsCollector.messageNotSent(bArr);
            }
            throw new MessageTransportException("Failed to enqueue message to " + this.destination + ".", e);
        }
    }

    private void start() {
        this.senderThread.set(new Thread(new Runnable() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpSender.2
            private DataOutputStream dataOutputStream = null;

            @Override // java.lang.Runnable
            public void run() {
                byte[] bArr = null;
                try {
                    TcpSender.this.isSenderRunning.set(true);
                    TcpSender.this.senderKeepRunning.set(true);
                    while (TcpSender.this.senderKeepRunning.get()) {
                        try {
                            bArr = TcpSender.this.batchOutgoingMessages ? TcpSender.this.sendingQueue.poll() : TcpSender.this.sendingQueue.take();
                            DataOutputStream dataOutputStream = getDataOutputStream();
                            if (bArr == null) {
                                TcpSender.this.socketTimeout.begin();
                                dataOutputStream.flush();
                                TcpSender.this.socketTimeout.end();
                                bArr = TcpSender.this.sendingQueue.take();
                            }
                            if (TcpSender.this.maxNumberOfQueuedMessages < 0 || TcpSender.this.sendingQueue.size() <= TcpSender.this.maxNumberOfQueuedMessages) {
                                int length = bArr.length;
                                if (length > 32767) {
                                    length = -1;
                                }
                                TcpSender.this.socketTimeout.begin();
                                dataOutputStream.writeShort(length);
                                if (length == -1) {
                                    dataOutputStream.writeInt(bArr.length);
                                }
                                dataOutputStream.write(bArr);
                                if (!TcpSender.this.batchOutgoingMessages) {
                                    dataOutputStream.flush();
                                }
                                TcpSender.this.socketTimeout.end();
                                if (TcpSender.this.statsCollector != null) {
                                    TcpSender.this.statsCollector.messageSent(bArr);
                                }
                            } else if (TcpSender.this.statsCollector != null) {
                                TcpSender.this.statsCollector.messageNotSent(bArr);
                            }
                        } catch (IOException e) {
                            TcpSender.this.socketTimeout.end();
                            if (TcpSender.this.statsCollector != null) {
                                TcpSender.this.statsCollector.messageNotSent(bArr);
                            }
                            close();
                            TcpSender.logger.warn("It appears the client " + TcpSender.this.destination + " is no longer taking calls.", e);
                        } catch (InterruptedException e2) {
                            TcpSender.this.socketTimeout.end();
                            if (TcpSender.this.statsCollector != null) {
                                TcpSender.this.statsCollector.messageNotSent(bArr);
                            }
                            if (TcpSender.this.senderKeepRunning.get()) {
                                TcpSender.logger.warn("Sending data to " + TcpSender.this.destination + " was interrupted for no good reason.", e2);
                            }
                        } catch (Throwable th) {
                            TcpSender.this.socketTimeout.end();
                            if (TcpSender.this.statsCollector != null) {
                                TcpSender.this.statsCollector.messageNotSent(bArr);
                            }
                            TcpSender.logger.error("Unknown exception thrown while trying to send a message to " + TcpSender.this.destination);
                        }
                    }
                } finally {
                    TcpSender.this.senderThread.set(false);
                    TcpSender.this.isSenderRunning.set(false);
                    TcpSender.this.socketTimeout.stop();
                }
            }

            private DataOutputStream getDataOutputStream() throws MessageTransportException, IOException {
                if (this.dataOutputStream == null) {
                    if (TcpSender.this.socketTimeout != null) {
                        TcpSender.this.socketTimeout.stop();
                    }
                    TcpSender.this.socket = TcpSender.this.makeSocket(TcpSender.this.destination);
                    TcpSender.this.socketTimeout = new SocketTimeout(TcpSender.this.socket, TcpSender.this.timeoutMillis);
                    if (TcpSender.this.isLocalAddress == IsLocalAddress.Unknown && TcpSender.this.socket.isBound()) {
                        InetAddress localAddress = TcpSender.this.socket.getLocalAddress();
                        TcpSender.this.isLocalAddress = Arrays.equals(localAddress.getAddress(), TcpSender.this.destination.inetAddress.getAddress()) ? IsLocalAddress.Yes : IsLocalAddress.No;
                    }
                    if (TcpSender.this.isLocalAddress == IsLocalAddress.Yes && TcpSender.this.socket.getLocalPort() == TcpSender.this.destination.port) {
                        throw new IOException("Connection to self same port!!!");
                    }
                    this.dataOutputStream = new DataOutputStream(new BufferedOutputStream(TcpSender.this.socket.getOutputStream(), 1048576));
                }
                return this.dataOutputStream;
            }

            private void close() {
                if (this.dataOutputStream != null) {
                    IOUtils.closeQuietly(this.dataOutputStream);
                }
                this.dataOutputStream = null;
                TcpSender.this.closeQuietly(TcpSender.this.socket);
                TcpSender.this.socket = null;
            }
        }, "TcpSender to " + this.destination));
        this.senderThread.get().setDaemon(true);
        this.senderThread.get().start();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stop() {
        Thread thread = this.senderThread.get();
        this.senderKeepRunning.set(false);
        if (thread != null) {
            thread.interrupt();
        }
        closeQuietly(this.socket);
        if (this.socketTimeout != null) {
            this.socketTimeout.stop();
        }
    }

    protected Socket makeSocket(TcpDestination tcpDestination) throws IOException {
        return new Socket(tcpDestination.inetAddress, tcpDestination.port);
    }

    protected void closeQuietly(Socket socket) {
        if (socket != null) {
            try {
                socket.close();
            } catch (IOException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("close socket failed for " + this.destination);
                }
            } catch (Throwable th) {
                logger.debug("Socket close resulted in ", th);
            }
        }
    }
}
