package org.apache.flume.api;

import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.KeyStore;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
import org.apache.avro.ipc.CallFuture;
import org.apache.avro.ipc.NettyTransceiver;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.source.avro.AvroSourceProtocol;
import org.apache.flume.source.avro.Status;
import org.apache.thrift.protocol.TMultiplexedProtocol;
import org.codehaus.jackson.util.MinimalPrettyPrinter;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.socket.SocketChannel;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.compression.ZlibDecoder;
import org.jboss.netty.handler.codec.compression.ZlibEncoder;
import org.jboss.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/flume-ng-sdk-1.9.0.jar:org/apache/flume/api/NettyAvroRpcClient.class */
public class NettyAvroRpcClient extends SSLContextAwareAbstractRpcClient {
    private ExecutorService callTimeoutPool;
    private final ReentrantLock stateLock = new ReentrantLock();
    private ConnState connState;
    private InetSocketAddress address;
    private Transceiver transceiver;
    private AvroSourceProtocol.Callback avroClient;
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) NettyAvroRpcClient.class);
    private boolean enableDeflateCompression;
    private int compressionLevel;
    private int maxIoWorkers;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/flume-ng-sdk-1.9.0.jar:org/apache/flume/api/NettyAvroRpcClient$ConnState.class */
    public enum ConnState {
        INIT,
        READY,
        DEAD
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/flume-ng-sdk-1.9.0.jar:org/apache/flume/api/NettyAvroRpcClient$PermissiveTrustManager.class */
    public static class PermissiveTrustManager implements X509TrustManager {
        private PermissiveTrustManager() {
        }

        @Override // javax.net.ssl.X509TrustManager
        public void checkClientTrusted(X509Certificate[] x509CertificateArr, String str) {
        }

        @Override // javax.net.ssl.X509TrustManager
        public void checkServerTrusted(X509Certificate[] x509CertificateArr, String str) {
        }

        @Override // javax.net.ssl.X509TrustManager
        public X509Certificate[] getAcceptedIssuers() {
            return new X509Certificate[0];
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/flume-ng-sdk-1.9.0.jar:org/apache/flume/api/NettyAvroRpcClient$SSLCompressionChannelFactory.class */
    public static class SSLCompressionChannelFactory extends NioClientSocketChannelFactory {
        private final boolean enableCompression;
        private final int compressionLevel;
        private final boolean enableSsl;
        private final boolean trustAllCerts;
        private final String truststore;
        private final String truststorePassword;
        private final String truststoreType;
        private final Set<String> excludeProtocols;
        private final Set<String> includeProtocols;
        private final Set<String> excludeCipherSuites;
        private final Set<String> includeCipherSuites;

        public SSLCompressionChannelFactory(Executor executor, Executor executor2, boolean z, boolean z2, boolean z3, int i, String str, String str2, String str3, Set<String> set, Set<String> set2, Set<String> set3, Set<String> set4) {
            super(executor, executor2);
            this.enableCompression = z;
            this.enableSsl = z2;
            this.compressionLevel = i;
            this.trustAllCerts = z3;
            this.truststore = str;
            this.truststorePassword = str2;
            this.truststoreType = str3;
            this.excludeProtocols = set;
            this.includeProtocols = set2;
            this.excludeCipherSuites = set3;
            this.includeCipherSuites = set4;
        }

        public SSLCompressionChannelFactory(Executor executor, Executor executor2, boolean z, boolean z2, boolean z3, int i, String str, String str2, String str3, Set<String> set, Set<String> set2, Set<String> set3, Set<String> set4, int i2) {
            super(executor, executor2, i2);
            this.enableCompression = z;
            this.enableSsl = z2;
            this.compressionLevel = i;
            this.trustAllCerts = z3;
            this.truststore = str;
            this.truststorePassword = str2;
            this.truststoreType = str3;
            this.excludeProtocols = set;
            this.includeProtocols = set2;
            this.excludeCipherSuites = set3;
            this.includeCipherSuites = set4;
        }

        @Override // org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory, org.jboss.netty.channel.ChannelFactory
        public SocketChannel newChannel(ChannelPipeline channelPipeline) {
            TrustManager[] trustManagers;
            try {
                if (this.enableCompression) {
                    channelPipeline.addFirst("deflater", new ZlibEncoder(this.compressionLevel));
                    channelPipeline.addFirst("inflater", new ZlibDecoder());
                }
                if (this.enableSsl) {
                    if (this.trustAllCerts) {
                        NettyAvroRpcClient.logger.warn("No truststore configured, setting TrustManager to accept all server certificates");
                        trustManagers = new TrustManager[]{new PermissiveTrustManager()};
                    } else {
                        KeyStore keyStore = null;
                        if (this.truststore != null) {
                            FileInputStream fileInputStream = new FileInputStream(this.truststore);
                            keyStore = KeyStore.getInstance(this.truststoreType);
                            keyStore.load(fileInputStream, this.truststorePassword != null ? this.truststorePassword.toCharArray() : null);
                        }
                        TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("SunX509");
                        trustManagerFactory.init(keyStore);
                        trustManagers = trustManagerFactory.getTrustManagers();
                    }
                    SSLContext sSLContext = SSLContext.getInstance("TLS");
                    sSLContext.init(null, trustManagers, null);
                    SSLEngine createSSLEngine = sSLContext.createSSLEngine();
                    createSSLEngine.setUseClientMode(true);
                    ArrayList arrayList = new ArrayList();
                    for (String str : createSSLEngine.getEnabledProtocols()) {
                        if ((this.includeProtocols.isEmpty() || this.includeProtocols.contains(str)) && !this.excludeProtocols.contains(str)) {
                            arrayList.add(str);
                        }
                    }
                    createSSLEngine.setEnabledProtocols((String[]) arrayList.toArray(new String[0]));
                    ArrayList arrayList2 = new ArrayList();
                    for (String str2 : createSSLEngine.getEnabledCipherSuites()) {
                        if ((this.includeCipherSuites.isEmpty() || this.includeCipherSuites.contains(str2)) && !this.excludeCipherSuites.contains(str2)) {
                            arrayList2.add(str2);
                        }
                    }
                    createSSLEngine.setEnabledCipherSuites((String[]) arrayList2.toArray(new String[0]));
                    NettyAvroRpcClient.logger.info("SSLEngine protocols enabled: " + Arrays.asList(createSSLEngine.getEnabledProtocols()));
                    NettyAvroRpcClient.logger.info("SSLEngine cipher suites enabled: " + Arrays.asList(createSSLEngine.getEnabledProtocols()));
                    channelPipeline.addFirst(RpcClientConfigurationConstants.CONFIG_SSL, new SslHandler(createSSLEngine));
                }
                return super.newChannel(channelPipeline);
            } catch (Exception e) {
                NettyAvroRpcClient.logger.error("Cannot create SSL channel", (Throwable) e);
                throw new RuntimeException("Cannot create SSL channel", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/flume-ng-sdk-1.9.0.jar:org/apache/flume/api/NettyAvroRpcClient$TransceiverThreadFactory.class */
    public static class TransceiverThreadFactory implements ThreadFactory {
        private final AtomicInteger threadId = new AtomicInteger(0);
        private final String prefix;

        public TransceiverThreadFactory(String str) {
            this.prefix = str;
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            thread.setName(this.prefix + MinimalPrettyPrinter.DEFAULT_ROOT_VALUE_SEPARATOR + this.threadId.incrementAndGet());
            return thread;
        }
    }

    private void connect() throws FlumeException {
        connect(this.connectTimeout, TimeUnit.MILLISECONDS);
    }

    private void connect(long j, TimeUnit timeUnit) throws FlumeException {
        this.callTimeoutPool = Executors.newCachedThreadPool(new TransceiverThreadFactory("Flume Avro RPC Client Call Invoker"));
        NioClientSocketChannelFactory nioClientSocketChannelFactory = null;
        try {
            ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(new TransceiverThreadFactory("Avro " + NettyTransceiver.class.getSimpleName() + " Boss"));
            ExecutorService newCachedThreadPool2 = Executors.newCachedThreadPool(new TransceiverThreadFactory("Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"));
            nioClientSocketChannelFactory = (this.enableDeflateCompression || this.enableSsl) ? this.maxIoWorkers >= 1 ? new SSLCompressionChannelFactory(newCachedThreadPool, newCachedThreadPool2, this.enableDeflateCompression, this.enableSsl, this.trustAllCerts, this.compressionLevel, this.truststore, this.truststorePassword, this.truststoreType, this.excludeProtocols, this.includeProtocols, this.excludeCipherSuites, this.includeCipherSuites, this.maxIoWorkers) : new SSLCompressionChannelFactory(newCachedThreadPool, newCachedThreadPool2, this.enableDeflateCompression, this.enableSsl, this.trustAllCerts, this.compressionLevel, this.truststore, this.truststorePassword, this.truststoreType, this.excludeProtocols, this.includeProtocols, this.excludeCipherSuites, this.includeCipherSuites) : this.maxIoWorkers >= 1 ? new NioClientSocketChannelFactory(newCachedThreadPool, newCachedThreadPool2, this.maxIoWorkers) : new NioClientSocketChannelFactory(newCachedThreadPool, newCachedThreadPool2);
            this.transceiver = new NettyTransceiver(this.address, nioClientSocketChannelFactory, Long.valueOf(timeUnit.toMillis(j)));
            this.avroClient = (AvroSourceProtocol.Callback) SpecificRequestor.getClient(AvroSourceProtocol.Callback.class, this.transceiver);
            setState(ConnState.READY);
        } catch (Throwable th) {
            if (this.callTimeoutPool != null) {
                this.callTimeoutPool.shutdownNow();
            }
            if (nioClientSocketChannelFactory != null) {
                nioClientSocketChannelFactory.releaseExternalResources();
            }
            if (th instanceof IOException) {
                throw new FlumeException(this + ": RPC connection error", th);
            }
            if (th instanceof FlumeException) {
                throw ((FlumeException) th);
            }
            if (!(th instanceof Error)) {
                throw new FlumeException(this + ": Unexpected exception", th);
            }
            throw ((Error) th);
        }
    }

    @Override // org.apache.flume.api.AbstractRpcClient, org.apache.flume.api.RpcClient
    public void close() throws FlumeException {
        if (this.callTimeoutPool != null) {
            this.callTimeoutPool.shutdown();
            try {
                if (!this.callTimeoutPool.awaitTermination(this.requestTimeout, TimeUnit.MILLISECONDS)) {
                    this.callTimeoutPool.shutdownNow();
                    if (!this.callTimeoutPool.awaitTermination(this.requestTimeout, TimeUnit.MILLISECONDS)) {
                        logger.warn(this + ": Unable to cleanly shut down call timeout pool");
                    }
                }
            } catch (InterruptedException e) {
                logger.warn(this + ": Interrupted during close", (Throwable) e);
                this.callTimeoutPool.shutdownNow();
                Thread.currentThread().interrupt();
            }
            this.callTimeoutPool = null;
        }
        try {
            try {
                this.transceiver.close();
                setState(ConnState.DEAD);
            } catch (IOException e2) {
                throw new FlumeException(this + ": Error closing transceiver.", e2);
            }
        } catch (Throwable th) {
            setState(ConnState.DEAD);
            throw th;
        }
    }

    public String toString() {
        return "NettyAvroRpcClient { host: " + this.address.getHostName() + ", port: " + this.address.getPort() + " }";
    }

    @Override // org.apache.flume.api.AbstractRpcClient, org.apache.flume.api.RpcClient
    public void append(Event event) throws EventDeliveryException {
        try {
            append(event, this.requestTimeout, TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            setState(ConnState.DEAD);
            if (th instanceof Error) {
                throw ((Error) th);
            }
            if (!(th instanceof TimeoutException)) {
                throw new EventDeliveryException(this + ": Failed to send event", th);
            }
            throw new EventDeliveryException(this + ": Failed to send event. RPC request timed out after " + this.requestTimeout + "ms", th);
        }
    }

    private void append(Event event, long j, TimeUnit timeUnit) throws EventDeliveryException {
        Future submit;
        assertReady();
        final CallFuture<Status> callFuture = new CallFuture<>();
        final AvroFlumeEvent avroFlumeEvent = new AvroFlumeEvent();
        avroFlumeEvent.setBody(ByteBuffer.wrap(event.getBody()));
        avroFlumeEvent.setHeaders(toCharSeqMap(event.getHeaders()));
        try {
            try {
                submit = this.callTimeoutPool.submit(new Callable<Void>() { // from class: org.apache.flume.api.NettyAvroRpcClient.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        NettyAvroRpcClient.this.avroClient.append(avroFlumeEvent, callFuture);
                        return null;
                    }
                });
                try {
                    try {
                        try {
                            submit.get(this.connectTimeout, TimeUnit.MILLISECONDS);
                            if (!submit.isDone()) {
                                submit.cancel(true);
                            }
                            waitForStatusOK(callFuture, j, timeUnit);
                        } catch (TimeoutException e) {
                            throw new EventDeliveryException(this + ": Handshake timed out after " + this.connectTimeout + " ms", e);
                        }
                    } catch (InterruptedException e2) {
                        throw new EventDeliveryException(this + ": Interrupted in handshake", e2);
                    }
                } catch (CancellationException e3) {
                    throw new EventDeliveryException(this + ": RPC request cancelled", e3);
                } catch (ExecutionException e4) {
                    throw new EventDeliveryException(this + ": RPC request exception", e4);
                }
            } catch (RejectedExecutionException e5) {
                throw new EventDeliveryException(this + ": Executor error", e5);
            }
        } catch (Throwable th) {
            if (!submit.isDone()) {
                submit.cancel(true);
            }
            throw th;
        }
    }

    @Override // org.apache.flume.api.AbstractRpcClient, org.apache.flume.api.RpcClient
    public void appendBatch(List<Event> list) throws EventDeliveryException {
        try {
            appendBatch(list, this.requestTimeout, TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            setState(ConnState.DEAD);
            if (th instanceof Error) {
                throw ((Error) th);
            }
            if (!(th instanceof TimeoutException)) {
                throw new EventDeliveryException(this + ": Failed to send batch", th);
            }
            throw new EventDeliveryException(this + ": Failed to send event. RPC request timed out after " + this.requestTimeout + " ms", th);
        }
    }

    private void appendBatch(List<Event> list, long j, TimeUnit timeUnit) throws EventDeliveryException {
        assertReady();
        Iterator<Event> it = list.iterator();
        final LinkedList linkedList = new LinkedList();
        while (it.hasNext()) {
            linkedList.clear();
            for (int i = 0; i < this.batchSize && it.hasNext(); i++) {
                Event next = it.next();
                AvroFlumeEvent avroFlumeEvent = new AvroFlumeEvent();
                avroFlumeEvent.setBody(ByteBuffer.wrap(next.getBody()));
                avroFlumeEvent.setHeaders(toCharSeqMap(next.getHeaders()));
                linkedList.add(avroFlumeEvent);
            }
            final CallFuture<Status> callFuture = new CallFuture<>();
            try {
                Future submit = this.callTimeoutPool.submit(new Callable<Void>() { // from class: org.apache.flume.api.NettyAvroRpcClient.2
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        NettyAvroRpcClient.this.avroClient.appendBatch(linkedList, callFuture);
                        return null;
                    }
                });
                try {
                    try {
                        try {
                            submit.get(this.connectTimeout, TimeUnit.MILLISECONDS);
                            if (!submit.isDone()) {
                                submit.cancel(true);
                            }
                            waitForStatusOK(callFuture, j, timeUnit);
                        } catch (Throwable th) {
                            if (!submit.isDone()) {
                                submit.cancel(true);
                            }
                            throw th;
                        }
                    } catch (CancellationException e) {
                        throw new EventDeliveryException(this + ": RPC request cancelled", e);
                    } catch (TimeoutException e2) {
                        throw new EventDeliveryException(this + ": Handshake timed out after " + this.connectTimeout + "ms", e2);
                    }
                } catch (InterruptedException e3) {
                    throw new EventDeliveryException(this + ": Interrupted in handshake", e3);
                } catch (ExecutionException e4) {
                    throw new EventDeliveryException(this + ": RPC request exception", e4);
                }
            } catch (RejectedExecutionException e5) {
                throw new EventDeliveryException(this + ": Executor error", e5);
            }
        }
    }

    private void waitForStatusOK(CallFuture<Status> callFuture, long j, TimeUnit timeUnit) throws EventDeliveryException {
        try {
            Status status = callFuture.get(j, timeUnit);
            if (status != Status.OK) {
                throw new EventDeliveryException(this + ": Avro RPC call returned Status: " + status);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new EventDeliveryException(this + ": RPC request interrupted", e);
        } catch (CancellationException e2) {
            throw new EventDeliveryException(this + ": RPC future was cancelled", e2);
        } catch (ExecutionException e3) {
            throw new EventDeliveryException(this + ": Exception thrown from remote handler", e3);
        } catch (TimeoutException e4) {
            throw new EventDeliveryException(this + ": RPC request timed out", e4);
        }
    }

    private void setState(ConnState connState) {
        this.stateLock.lock();
        try {
            if (this.connState == ConnState.DEAD && this.connState != connState) {
                throw new IllegalStateException("Cannot transition from CLOSED state.");
            }
            this.connState = connState;
        } finally {
            this.stateLock.unlock();
        }
    }

    private void assertReady() throws EventDeliveryException {
        this.stateLock.lock();
        try {
            ConnState connState = this.connState;
            if (connState != ConnState.READY) {
                throw new EventDeliveryException("RPC failed, client in an invalid state: " + connState);
            }
        } finally {
            this.stateLock.unlock();
        }
    }

    private static Map<CharSequence, CharSequence> toCharSeqMap(Map<String, String> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue());
        }
        return hashMap;
    }

    @Override // org.apache.flume.api.AbstractRpcClient, org.apache.flume.api.RpcClient
    public boolean isActive() {
        this.stateLock.lock();
        try {
            return this.connState == ConnState.READY;
        } finally {
            this.stateLock.unlock();
        }
    }

    @Override // org.apache.flume.api.AbstractRpcClient
    public synchronized void configure(Properties properties) throws FlumeException {
        this.stateLock.lock();
        try {
            if (this.connState == ConnState.READY || this.connState == ConnState.DEAD) {
                throw new FlumeException("This client was already configured, cannot reconfigure.");
            }
            this.batchSize = parseBatchSize(properties);
            String property = properties.getProperty(RpcClientConfigurationConstants.CONFIG_HOSTS);
            if (property == null || property.isEmpty()) {
                throw new FlumeException("Hosts list is invalid: " + property);
            }
            String[] split = property.split("\\s+");
            if (split.length > 1) {
                logger.warn("More than one hosts are specified for the default client. Only the first host will be used and others ignored. Specified: " + property + "; to be used: " + split[0]);
            }
            String property2 = properties.getProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + split[0]);
            if (property2 == null || property2.isEmpty()) {
                throw new FlumeException("Host not found: " + split[0]);
            }
            String[] split2 = property2.split(TMultiplexedProtocol.SEPARATOR);
            if (split2.length != 2) {
                throw new FlumeException("Invalid hostname: " + split[0]);
            }
            try {
                this.address = new InetSocketAddress(split2[0], Integer.valueOf(Integer.parseInt(split2[1])).intValue());
                this.connectTimeout = RpcClientConfigurationConstants.DEFAULT_CONNECT_TIMEOUT_MILLIS;
                String property3 = properties.getProperty(RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT);
                if (property3 != null && property3.trim().length() > 0) {
                    try {
                        this.connectTimeout = Long.parseLong(property3);
                        if (this.connectTimeout < 1000) {
                            logger.warn("Connection timeout specified less than 1s. Using default value instead.");
                            this.connectTimeout = RpcClientConfigurationConstants.DEFAULT_CONNECT_TIMEOUT_MILLIS;
                        }
                    } catch (NumberFormatException e) {
                        logger.error("Invalid connect timeout specified: " + property3);
                    }
                }
                this.requestTimeout = RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS;
                String property4 = properties.getProperty(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT);
                if (property4 != null && property4.trim().length() > 0) {
                    try {
                        this.requestTimeout = Long.parseLong(property4);
                        if (this.requestTimeout < 1000) {
                            logger.warn("Request timeout specified less than 1s. Using default value instead.");
                            this.requestTimeout = RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS;
                        }
                    } catch (NumberFormatException e2) {
                        logger.error("Invalid request timeout specified: " + property4);
                    }
                }
                String property5 = properties.getProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_TYPE);
                if (property5 != null && property5.equalsIgnoreCase("deflate")) {
                    this.enableDeflateCompression = true;
                    String property6 = properties.getProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_LEVEL);
                    this.compressionLevel = 6;
                    if (property6 != null) {
                        try {
                            this.compressionLevel = Integer.parseInt(property6);
                        } catch (NumberFormatException e3) {
                            logger.error("Invalid compression level: " + property6);
                        }
                    }
                }
                configureSSL(properties);
                String property7 = properties.getProperty(RpcClientConfigurationConstants.MAX_IO_WORKERS);
                if (!StringUtils.isEmpty(property7)) {
                    try {
                        this.maxIoWorkers = Integer.parseInt(property7);
                    } catch (NumberFormatException e4) {
                        logger.warn("Invalid maxIOWorkers:" + property7 + " Using default maxIOWorkers.");
                        this.maxIoWorkers = -1;
                    }
                }
                if (this.maxIoWorkers < 1) {
                    logger.info("Using default maxIOWorkers");
                    this.maxIoWorkers = -1;
                }
                connect();
            } catch (NumberFormatException e5) {
                throw new FlumeException("Invalid Port: " + split2[1], e5);
            }
        } finally {
            this.stateLock.unlock();
        }
    }
}
