/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.source;

import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.shaded.com.google.common.collect.Lists;
import org.apache.flink.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.apache.flume.thrift.Status;
import org.apache.flume.thrift.ThriftFlumeEvent;
import org.apache.flume.thrift.ThriftSourceProtocol;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThriftSource
extends AbstractSource
implements Configurable,
EventDrivenSource {
    public static final Logger logger = LoggerFactory.getLogger(ThriftSource.class);
    public static final String CONFIG_THREADS = "threads";
    public static final String CONFIG_BIND = "bind";
    public static final String CONFIG_PORT = "port";
    private Integer port;
    private String bindAddress;
    private int maxThreads = 0;
    private SourceCounter sourceCounter;
    private TServer server;
    private TServerTransport serverTransport;
    private ExecutorService servingExecutor;

    @Override
    public void configure(Context context) {
        logger.info("Configuring thrift source.");
        this.port = context.getInteger(CONFIG_PORT);
        Preconditions.checkNotNull(this.port, "Port must be specified for Thrift Source.");
        this.bindAddress = context.getString(CONFIG_BIND);
        Preconditions.checkNotNull(this.bindAddress, "Bind address must be specified for Thrift Source.");
        try {
            this.maxThreads = context.getInteger(CONFIG_THREADS, 0);
        }
        catch (NumberFormatException e) {
            logger.warn("Thrift source's \"threads\" property must specify an integer value: " + context.getString(CONFIG_THREADS));
        }
        if (this.sourceCounter == null) {
            this.sourceCounter = new SourceCounter(this.getName());
        }
    }

    @Override
    public void start() {
        logger.info("Starting thrift source");
        this.maxThreads = this.maxThreads <= 0 ? Integer.MAX_VALUE : this.maxThreads;
        Class<?> serverClass = null;
        Class<?> argsClass = null;
        TServer.AbstractServerArgs args = null;
        try {
            serverClass = Class.forName("org.apache.thrift.server.TThreadedSelectorServer");
            argsClass = Class.forName("org.apache.thrift.server.TThreadedSelectorServer$Args");
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Flume Thrift IPC Thread %d").build();
            ExecutorService sourceService = this.maxThreads == 0 ? Executors.newCachedThreadPool(threadFactory) : Executors.newFixedThreadPool(this.maxThreads, threadFactory);
            this.serverTransport = new TNonblockingServerSocket(new InetSocketAddress(this.bindAddress, (int)this.port));
            args = (TNonblockingServer.AbstractNonblockingServerArgs)argsClass.getConstructor(TNonblockingServerTransport.class).newInstance(this.serverTransport);
            Method m = argsClass.getDeclaredMethod("executorService", ExecutorService.class);
            m.invoke((Object)args, sourceService);
        }
        catch (ClassNotFoundException e) {
            logger.info("TThreadedSelectorServer not found, using TThreadPoolServer");
            try {
                this.serverTransport = new TServerSocket(new InetSocketAddress(this.bindAddress, (int)this.port));
                serverClass = Class.forName("org.apache.thrift.server.TThreadPoolServer");
                argsClass = Class.forName("org.apache.thrift.server.TThreadPoolServer$Args");
                args = (TServer.AbstractServerArgs)argsClass.getConstructor(TServerTransport.class).newInstance(this.serverTransport);
                Method m = argsClass.getDeclaredMethod("maxWorkerThreads", Integer.TYPE);
                m.invoke((Object)args, this.maxThreads);
            }
            catch (ClassNotFoundException e1) {
                throw new FlumeException("Cannot find TThreadSelectorServer or TThreadPoolServer. Please install a compatible version of thrift in the classpath", e1);
            }
            catch (Throwable throwable) {
                throw new FlumeException("Cannot start Thrift source.", throwable);
            }
        }
        catch (Throwable throwable) {
            throw new FlumeException("Cannot start Thrift source.", throwable);
        }
        try {
            args.protocolFactory((TProtocolFactory)new TCompactProtocol.Factory());
            args.inputTransportFactory((TTransportFactory)new TFastFramedTransport.Factory());
            args.outputTransportFactory((TTransportFactory)new TFastFramedTransport.Factory());
            args.processor(new ThriftSourceProtocol.Processor<ThriftSourceHandler>(new ThriftSourceHandler()));
            this.server = (TServer)serverClass.getConstructor(argsClass).newInstance(args);
        }
        catch (Throwable ex) {
            throw new FlumeException("Cannot start Thrift Source.", ex);
        }
        this.servingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("Flume Thrift Source I/O Boss").build());
        this.servingExecutor.submit(new Runnable(){

            @Override
            public void run() {
                ThriftSource.this.server.serve();
            }
        });
        long timeAfterStart = System.currentTimeMillis();
        while (!this.server.isServing()) {
            try {
                if (System.currentTimeMillis() - timeAfterStart >= 10000L) {
                    throw new FlumeException("Thrift server failed to start!");
                }
                TimeUnit.MILLISECONDS.sleep(1000L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new FlumeException("Interrupted while waiting for Thrift server to start.", e);
            }
        }
        this.sourceCounter.start();
        logger.info("Started Thrift source.");
        super.start();
    }

    @Override
    public void stop() {
        if (this.server != null && this.server.isServing()) {
            this.server.stop();
        }
        if (this.servingExecutor != null) {
            this.servingExecutor.shutdown();
            try {
                if (!this.servingExecutor.awaitTermination(5L, TimeUnit.SECONDS)) {
                    this.servingExecutor.shutdownNow();
                }
            }
            catch (InterruptedException e) {
                throw new FlumeException("Interrupted while waiting for server to be shutdown.");
            }
        }
        this.sourceCounter.stop();
        super.stop();
    }

    private class ThriftSourceHandler
    implements ThriftSourceProtocol.Iface {
        private ThriftSourceHandler() {
        }

        @Override
        public Status append(ThriftFlumeEvent event) throws TException {
            Event flumeEvent = EventBuilder.withBody(event.getBody(), event.getHeaders());
            ThriftSource.this.sourceCounter.incrementAppendReceivedCount();
            ThriftSource.this.sourceCounter.incrementEventReceivedCount();
            try {
                ThriftSource.this.getChannelProcessor().processEvent(flumeEvent);
            }
            catch (ChannelException ex) {
                logger.warn("Thrift source " + ThriftSource.this.getName() + " could not append events " + "to the channel.", (Throwable)ex);
                return Status.FAILED;
            }
            ThriftSource.this.sourceCounter.incrementAppendAcceptedCount();
            ThriftSource.this.sourceCounter.incrementEventAcceptedCount();
            return Status.OK;
        }

        @Override
        public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
            ThriftSource.this.sourceCounter.incrementAppendBatchReceivedCount();
            ThriftSource.this.sourceCounter.addToEventReceivedCount(events.size());
            ArrayList<Event> flumeEvents = Lists.newArrayList();
            for (ThriftFlumeEvent event : events) {
                flumeEvents.add(EventBuilder.withBody(event.getBody(), event.getHeaders()));
            }
            try {
                ThriftSource.this.getChannelProcessor().processEventBatch(flumeEvents);
            }
            catch (ChannelException ex) {
                logger.warn("Thrift source %s could not append events to the channel.", (Object)ThriftSource.this.getName());
                return Status.FAILED;
            }
            ThriftSource.this.sourceCounter.incrementAppendBatchAcceptedCount();
            ThriftSource.this.sourceCounter.addToEventAcceptedCount(events.size());
            return Status.OK;
        }
    }
}

