/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.source;

import java.io.FileInputStream;
import java.net.InetSocketAddress;
import java.security.KeyStore;
import java.security.Security;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.avro.ipc.NettyServer;
import org.apache.avro.ipc.NettyTransceiver;
import org.apache.avro.ipc.Responder;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.shaded.com.google.common.base.Throwables;
import org.apache.flink.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.source.avro.AvroSourceProtocol;
import org.apache.flume.source.avro.Status;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.compression.ZlibDecoder;
import org.jboss.netty.handler.codec.compression.ZlibEncoder;
import org.jboss.netty.handler.ipfilter.IpFilterRule;
import org.jboss.netty.handler.ipfilter.IpFilterRuleHandler;
import org.jboss.netty.handler.ipfilter.PatternRule;
import org.jboss.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AvroSource
extends AbstractSource
implements EventDrivenSource,
Configurable,
AvroSourceProtocol {
    private static final String THREADS = "threads";
    private static final Logger logger = LoggerFactory.getLogger(AvroSource.class);
    private static final String PORT_KEY = "port";
    private static final String BIND_KEY = "bind";
    private static final String COMPRESSION_TYPE = "compression-type";
    private static final String SSL_KEY = "ssl";
    private static final String IP_FILTER_KEY = "ipFilter";
    private static final String IP_FILTER_RULES_KEY = "ipFilterRules";
    private static final String KEYSTORE_KEY = "keystore";
    private static final String KEYSTORE_PASSWORD_KEY = "keystore-password";
    private static final String KEYSTORE_TYPE_KEY = "keystore-type";
    private int port;
    private String bindAddress;
    private String compressionType;
    private String keystore;
    private String keystorePassword;
    private String keystoreType;
    private boolean enableSsl = false;
    private boolean enableIpFilter;
    private String patternRuleConfigDefinition;
    private Server server;
    private SourceCounter sourceCounter;
    private int maxThreads;
    private ScheduledExecutorService connectionCountUpdater;
    private List<IpFilterRule> rules;

    @Override
    public void configure(Context context) {
        Configurables.ensureRequiredNonNull(context, PORT_KEY, BIND_KEY);
        this.port = context.getInteger(PORT_KEY);
        this.bindAddress = context.getString(BIND_KEY);
        this.compressionType = context.getString(COMPRESSION_TYPE, "none");
        try {
            this.maxThreads = context.getInteger(THREADS, 0);
        }
        catch (NumberFormatException e) {
            logger.warn("AVRO source's \"threads\" property must specify an integer value.", (Object)context.getString(THREADS));
        }
        this.enableSsl = context.getBoolean(SSL_KEY, false);
        this.keystore = context.getString(KEYSTORE_KEY);
        this.keystorePassword = context.getString(KEYSTORE_PASSWORD_KEY);
        this.keystoreType = context.getString(KEYSTORE_TYPE_KEY, "JKS");
        if (this.enableSsl) {
            Preconditions.checkNotNull(this.keystore, "keystore must be specified when SSL is enabled");
            Preconditions.checkNotNull(this.keystorePassword, "keystore-password must be specified when SSL is enabled");
            try {
                KeyStore ks = KeyStore.getInstance(this.keystoreType);
                ks.load(new FileInputStream(this.keystore), this.keystorePassword.toCharArray());
            }
            catch (Exception ex) {
                throw new FlumeException("Avro source configured with invalid keystore: " + this.keystore, ex);
            }
        }
        this.enableIpFilter = context.getBoolean(IP_FILTER_KEY, false);
        if (this.enableIpFilter) {
            this.patternRuleConfigDefinition = context.getString(IP_FILTER_RULES_KEY);
            if (this.patternRuleConfigDefinition == null || this.patternRuleConfigDefinition.trim().isEmpty()) {
                throw new FlumeException("ipFilter is configured with true but ipFilterRules is not defined: ");
            }
            String[] patternRuleDefinitions = this.patternRuleConfigDefinition.split(",");
            this.rules = new ArrayList<IpFilterRule>(patternRuleDefinitions.length);
            for (String patternRuleDefinition : patternRuleDefinitions) {
                this.rules.add((IpFilterRule)this.generateRule(patternRuleDefinition));
            }
        }
        if (this.sourceCounter == null) {
            this.sourceCounter = new SourceCounter(this.getName());
        }
    }

    @Override
    public void start() {
        logger.info("Starting {}...", (Object)this);
        SpecificResponder responder = new SpecificResponder(AvroSourceProtocol.class, (Object)this);
        NioServerSocketChannelFactory socketChannelFactory = this.initSocketChannelFactory();
        ChannelPipelineFactory pipelineFactory = this.initChannelPipelineFactory();
        this.server = new NettyServer((Responder)responder, new InetSocketAddress(this.bindAddress, this.port), (ChannelFactory)socketChannelFactory, pipelineFactory, null);
        this.connectionCountUpdater = Executors.newSingleThreadScheduledExecutor();
        this.server.start();
        this.sourceCounter.start();
        super.start();
        final NettyServer srv = (NettyServer)this.server;
        this.connectionCountUpdater.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                AvroSource.this.sourceCounter.setOpenConnectionCount(srv.getNumActiveConnections());
            }
        }, 0L, 60L, TimeUnit.SECONDS);
        logger.info("Avro source {} started.", (Object)this.getName());
    }

    private NioServerSocketChannelFactory initSocketChannelFactory() {
        NioServerSocketChannelFactory socketChannelFactory = this.maxThreads <= 0 ? new NioServerSocketChannelFactory((Executor)Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("Avro " + NettyTransceiver.class.getSimpleName() + " Boss-%d").build()), (Executor)Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("Avro " + NettyTransceiver.class.getSimpleName() + "  I/O Worker-%d").build())) : new NioServerSocketChannelFactory((Executor)Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("Avro " + NettyTransceiver.class.getSimpleName() + " Boss-%d").build()), (Executor)Executors.newFixedThreadPool(this.maxThreads, new ThreadFactoryBuilder().setNameFormat("Avro " + NettyTransceiver.class.getSimpleName() + "  I/O Worker-%d").build()));
        return socketChannelFactory;
    }

    private ChannelPipelineFactory initChannelPipelineFactory() {
        boolean enableCompression = this.compressionType.equalsIgnoreCase("deflate");
        Object pipelineFactory = enableCompression || this.enableSsl || this.enableIpFilter ? new AdvancedChannelPipelineFactory(enableCompression, this.enableSsl, this.keystore, this.keystorePassword, this.keystoreType, this.enableIpFilter, this.patternRuleConfigDefinition) : new ChannelPipelineFactory(){

            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline();
            }
        };
        return pipelineFactory;
    }

    @Override
    public void stop() {
        logger.info("Avro source {} stopping: {}", (Object)this.getName(), (Object)this);
        this.server.close();
        try {
            this.server.join();
        }
        catch (InterruptedException e) {
            logger.info("Avro source " + this.getName() + ": Interrupted while waiting " + "for Avro server to stop. Exiting. Exception follows.", (Throwable)e);
        }
        this.sourceCounter.stop();
        this.connectionCountUpdater.shutdown();
        while (!this.connectionCountUpdater.isTerminated()) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException ex) {
                logger.error("Interrupted while waiting for connection count executor to terminate", (Throwable)ex);
                Throwables.propagate(ex);
            }
        }
        super.stop();
        logger.info("Avro source {} stopped. Metrics: {}", (Object)this.getName(), (Object)this.sourceCounter);
    }

    @Override
    public String toString() {
        return "Avro source " + this.getName() + ": { bindAddress: " + this.bindAddress + ", port: " + this.port + " }";
    }

    private static Map<String, String> toStringMap(Map<CharSequence, CharSequence> charSeqMap) {
        HashMap<String, String> stringMap = new HashMap<String, String>();
        for (Map.Entry<CharSequence, CharSequence> entry : charSeqMap.entrySet()) {
            stringMap.put(((Object)entry.getKey()).toString(), ((Object)entry.getValue()).toString());
        }
        return stringMap;
    }

    @Override
    public Status append(AvroFlumeEvent avroEvent) {
        logger.debug("Avro source {}: Received avro event: {}", (Object)this.getName(), (Object)avroEvent);
        this.sourceCounter.incrementAppendReceivedCount();
        this.sourceCounter.incrementEventReceivedCount();
        Event event = EventBuilder.withBody(avroEvent.getBody().array(), AvroSource.toStringMap(avroEvent.getHeaders()));
        try {
            this.getChannelProcessor().processEvent(event);
        }
        catch (ChannelException ex) {
            logger.warn("Avro source " + this.getName() + ": Unable to process event. " + "Exception follows.", (Throwable)ex);
            return Status.FAILED;
        }
        this.sourceCounter.incrementAppendAcceptedCount();
        this.sourceCounter.incrementEventAcceptedCount();
        return Status.OK;
    }

    @Override
    public Status appendBatch(List<AvroFlumeEvent> events) {
        logger.debug("Avro source {}: Received avro event batch of {} events.", (Object)this.getName(), (Object)events.size());
        this.sourceCounter.incrementAppendBatchReceivedCount();
        this.sourceCounter.addToEventReceivedCount(events.size());
        ArrayList<Event> batch = new ArrayList<Event>();
        for (AvroFlumeEvent avroEvent : events) {
            Event event = EventBuilder.withBody(avroEvent.getBody().array(), AvroSource.toStringMap(avroEvent.getHeaders()));
            batch.add(event);
        }
        try {
            this.getChannelProcessor().processEventBatch(batch);
        }
        catch (Throwable t) {
            logger.error("Avro source " + this.getName() + ": Unable to process event " + "batch. Exception follows.", t);
            if (t instanceof Error) {
                throw (Error)t;
            }
            return Status.FAILED;
        }
        this.sourceCounter.incrementAppendBatchAcceptedCount();
        this.sourceCounter.addToEventAcceptedCount(events.size());
        return Status.OK;
    }

    private PatternRule generateRule(String patternRuleDefinition) throws FlumeException {
        int firstColonIndex = (patternRuleDefinition = patternRuleDefinition.trim()).indexOf(":");
        if (firstColonIndex == -1) {
            throw new FlumeException("Invalid ipFilter patternRule '" + patternRuleDefinition + "' should look like <'allow'  or 'deny'>:<'ip' or " + "'name'>:<pattern>");
        }
        String ruleAccessFlag = patternRuleDefinition.substring(0, firstColonIndex);
        int secondColonIndex = patternRuleDefinition.indexOf(":", firstColonIndex + 1);
        if (!ruleAccessFlag.equals("allow") && !ruleAccessFlag.equals("deny") || secondColonIndex == -1) {
            throw new FlumeException("Invalid ipFilter patternRule '" + patternRuleDefinition + "' should look like <'allow'  or 'deny'>:<'ip' or " + "'name'>:<pattern>");
        }
        String patternTypeFlag = patternRuleDefinition.substring(firstColonIndex + 1, secondColonIndex);
        if (!patternTypeFlag.equals("ip") && !patternTypeFlag.equals("name")) {
            throw new FlumeException("Invalid ipFilter patternRule '" + patternRuleDefinition + "' should look like <'allow'  or 'deny'>:<'ip' or " + "'name'>:<pattern>");
        }
        boolean isAllow = ruleAccessFlag.equals("allow");
        String patternRuleString = (patternTypeFlag.equals("ip") ? "i" : "n") + ":" + patternRuleDefinition.substring(secondColonIndex + 1);
        logger.info("Adding ipFilter PatternRule: " + (isAllow ? "Allow" : "deny") + " " + patternRuleString);
        return new PatternRule(isAllow, patternRuleString);
    }

    private class AdvancedChannelPipelineFactory
    implements ChannelPipelineFactory {
        private boolean enableCompression;
        private boolean enableSsl;
        private String keystore;
        private String keystorePassword;
        private String keystoreType;
        private boolean enableIpFilter;
        private String patternRuleConfigDefinition;

        public AdvancedChannelPipelineFactory(boolean enableCompression, boolean enableSsl, String keystore, String keystorePassword, String keystoreType, boolean enableIpFilter, String patternRuleConfigDefinition) {
            this.enableCompression = enableCompression;
            this.enableSsl = enableSsl;
            this.keystore = keystore;
            this.keystorePassword = keystorePassword;
            this.keystoreType = keystoreType;
            this.enableIpFilter = enableIpFilter;
            this.patternRuleConfigDefinition = patternRuleConfigDefinition;
        }

        private SSLContext createServerSSLContext() {
            try {
                KeyStore ks = KeyStore.getInstance(this.keystoreType);
                ks.load(new FileInputStream(this.keystore), this.keystorePassword.toCharArray());
                KeyManagerFactory kmf = KeyManagerFactory.getInstance(this.getAlgorithm());
                kmf.init(ks, this.keystorePassword.toCharArray());
                SSLContext serverContext = SSLContext.getInstance("TLS");
                serverContext.init(kmf.getKeyManagers(), null, null);
                return serverContext;
            }
            catch (Exception e) {
                throw new Error("Failed to initialize the server-side SSLContext", e);
            }
        }

        private String getAlgorithm() {
            String algorithm = Security.getProperty("ssl.KeyManagerFactory.algorithm");
            if (algorithm == null) {
                algorithm = "SunX509";
            }
            return algorithm;
        }

        public ChannelPipeline getPipeline() throws Exception {
            ChannelPipeline pipeline = Channels.pipeline();
            if (this.enableCompression) {
                ZlibEncoder encoder = new ZlibEncoder(6);
                pipeline.addFirst("deflater", (ChannelHandler)encoder);
                pipeline.addFirst("inflater", (ChannelHandler)new ZlibDecoder());
            }
            if (this.enableSsl) {
                SSLEngine sslEngine = this.createServerSSLContext().createSSLEngine();
                sslEngine.setUseClientMode(false);
                pipeline.addFirst(AvroSource.SSL_KEY, (ChannelHandler)new SslHandler(sslEngine));
            }
            if (this.enableIpFilter) {
                logger.info("Setting up ipFilter with the following rule definition: " + this.patternRuleConfigDefinition);
                IpFilterRuleHandler ipFilterHandler = new IpFilterRuleHandler();
                ipFilterHandler.addAll((Collection)AvroSource.this.rules);
                logger.info("Adding ipFilter with " + ipFilterHandler.size() + " rules");
                pipeline.addFirst(AvroSource.IP_FILTER_KEY, (ChannelHandler)ipFilterHandler);
            }
            return pipeline;
        }
    }
}

