package co.cask.cdap.internal.app.runtime.procedure;

import co.cask.cdap.api.procedure.ProcedureSpecification;
import co.cask.cdap.app.ApplicationSpecification;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.runtime.Arguments;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.app.runtime.ProgramRunner;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.logging.common.LogWriter;
import co.cask.cdap.common.logging.logback.CAppender;
import co.cask.cdap.common.metrics.MetricsCollectionService;
import co.cask.cdap.common.metrics.MetricsCollector;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.internal.app.runtime.AbstractProgramController;
import co.cask.cdap.internal.app.runtime.DataFabricFacadeFactory;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.proto.ProgramType;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.twill.api.RunId;
import org.apache.twill.api.ServiceAnnouncer;
import org.apache.twill.common.Cancellable;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.internal.RunIds;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.execution.ExecutionHandler;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/procedure/ProcedureProgramRunner.class */
public final class ProcedureProgramRunner implements ProgramRunner {
    private static final Logger LOG = LoggerFactory.getLogger(ProcedureProgramRunner.class);
    private static final int MAX_IO_THREADS = 5;
    private static final int MAX_HANDLER_THREADS = 100;
    private static final int CLOSE_CHANNEL_TIMEOUT = 5;
    private final DataFabricFacadeFactory dataFabricFacadeFactory;
    private final ServiceAnnouncer serviceAnnouncer;
    private final InetAddress hostname;
    private final MetricsCollectionService metricsCollectionService;
    private final DiscoveryServiceClient discoveryServiceClient;
    private final DatasetFramework dsFramework;
    private final CConfiguration conf;
    private ProcedureHandlerMethodFactory handlerMethodFactory;
    private ExecutionHandler executionHandler;
    private ServerBootstrap bootstrap;
    private ChannelGroup channelGroup;
    private BasicProcedureContext procedureContext;

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/procedure/ProcedureProgramRunner$ProcedureProgramController.class */
    private final class ProcedureProgramController extends AbstractProgramController {
        private final Cancellable cancellable;

        ProcedureProgramController(Program program, RunId runId, Cancellable cancellable) {
            super(program.getName(), runId);
            this.cancellable = cancellable;
            started();
        }

        @Override // co.cask.cdap.internal.app.runtime.AbstractProgramController
        protected void doSuspend() throws Exception {
        }

        @Override // co.cask.cdap.internal.app.runtime.AbstractProgramController
        protected void doResume() throws Exception {
        }

        @Override // co.cask.cdap.internal.app.runtime.AbstractProgramController
        protected void doStop() throws Exception {
            ProcedureProgramRunner.LOG.info("Stopping procedure: " + ProcedureProgramRunner.this.procedureContext);
            this.cancellable.cancel();
            try {
                if (!ProcedureProgramRunner.this.channelGroup.close().await(5L, TimeUnit.SECONDS)) {
                    ProcedureProgramRunner.LOG.warn("Timeout when closing all channels.");
                }
                ProcedureProgramRunner.this.handlerMethodFactory.stopAndWait();
                ProcedureProgramRunner.LOG.info("Procedure stopped: " + ProcedureProgramRunner.this.procedureContext);
            } finally {
                ProcedureProgramRunner.this.bootstrap.releaseExternalResources();
                ProcedureProgramRunner.this.executionHandler.releaseExternalResources();
            }
        }

        @Override // co.cask.cdap.internal.app.runtime.AbstractProgramController
        protected void doCommand(String str, Object obj) throws Exception {
            if (ProgramOptionConstants.INSTANCES.equals(str) && (obj instanceof Integer)) {
                ProcedureProgramRunner.LOG.info("Setting procedure instance in procedure program runner.");
                ProcedureProgramRunner.this.procedureContext.setInstanceCount(((Integer) obj).intValue());
            }
        }
    }

    @Inject
    public ProcedureProgramRunner(DataFabricFacadeFactory dataFabricFacadeFactory, ServiceAnnouncer serviceAnnouncer, @Named("app.bind.address") InetAddress inetAddress, MetricsCollectionService metricsCollectionService, DiscoveryServiceClient discoveryServiceClient, DatasetFramework datasetFramework, CConfiguration cConfiguration) {
        this.dataFabricFacadeFactory = dataFabricFacadeFactory;
        this.serviceAnnouncer = serviceAnnouncer;
        this.hostname = inetAddress;
        this.metricsCollectionService = metricsCollectionService;
        this.discoveryServiceClient = discoveryServiceClient;
        this.dsFramework = datasetFramework;
        this.conf = cConfiguration;
    }

    @Inject(optional = true)
    void setLogWriter(LogWriter logWriter) {
        CAppender.logWriter = logWriter;
    }

    private BasicProcedureContextFactory createContextFactory(Program program, RunId runId, int i, int i2, Arguments arguments, ProcedureSpecification procedureSpecification, DiscoveryServiceClient discoveryServiceClient) {
        return new BasicProcedureContextFactory(program, runId, i, i2, arguments, procedureSpecification, this.metricsCollectionService, discoveryServiceClient, this.dsFramework, this.conf);
    }

    @Override // co.cask.cdap.app.runtime.ProgramRunner
    public ProgramController run(Program program, ProgramOptions programOptions) {
        try {
            ApplicationSpecification applicationSpecification = program.getApplicationSpecification();
            Preconditions.checkNotNull(applicationSpecification, "Missing application specification.");
            ProgramType type = program.getType();
            Preconditions.checkNotNull(type, "Missing processor type.");
            Preconditions.checkArgument(type == ProgramType.PROCEDURE, "Only PROCEDURE process type is supported.");
            ProcedureSpecification procedureSpecification = applicationSpecification.getProcedures().get(program.getName());
            Preconditions.checkNotNull(procedureSpecification, "Missing ProcedureSpecification for %s", new Object[]{program.getName()});
            int parseInt = Integer.parseInt(programOptions.getArguments().getOption(ProgramOptionConstants.INSTANCE_ID, "0"));
            int instances = applicationSpecification.getProcedures().get(program.getName()).getInstances();
            Preconditions.checkArgument(instances > 0, "Invalid or missing instance count");
            RunId generate = RunIds.generate();
            BasicProcedureContextFactory createContextFactory = createContextFactory(program, generate, parseInt, instances, programOptions.getUserArguments(), procedureSpecification, this.discoveryServiceClient);
            this.procedureContext = new BasicProcedureContext(program, generate, parseInt, instances, ImmutableSet.of(), programOptions.getUserArguments(), procedureSpecification, this.metricsCollectionService, this.discoveryServiceClient, this.dsFramework, this.conf);
            this.handlerMethodFactory = new ProcedureHandlerMethodFactory(program, this.dataFabricFacadeFactory, createContextFactory);
            this.handlerMethodFactory.startAndWait();
            this.channelGroup = new DefaultChannelGroup();
            this.executionHandler = createExecutionHandler();
            this.bootstrap = createBootstrap(program, this.executionHandler, this.handlerMethodFactory, this.procedureContext.getProgramMetrics(), this.channelGroup);
            Channel bind = this.bootstrap.bind(new InetSocketAddress(this.hostname, 0));
            this.channelGroup.add(bind);
            LOG.info(String.format("Procedure server started for %s.%s listening on %s", program.getApplicationId(), program.getName(), bind.getLocalAddress()));
            return new ProcedureProgramController(program, generate, this.serviceAnnouncer.announce(getServiceName(program), ((InetSocketAddress) bind.getLocalAddress()).getPort()));
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    private ServerBootstrap createBootstrap(Program program, ExecutionHandler executionHandler, HandlerMethodFactory handlerMethodFactory, MetricsCollector metricsCollector, ChannelGroup channelGroup) {
        ServerBootstrap serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("procedure-boss-" + program.getName() + "-%d").build()), Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("procedure-worker-" + program.getNamespaceId() + "-%d").build()), 5));
        serverBootstrap.setPipelineFactory(new ProcedurePipelineFactory(executionHandler, handlerMethodFactory, metricsCollector, channelGroup));
        return serverBootstrap;
    }

    private ExecutionHandler createExecutionHandler() {
        OrderedMemoryAwareThreadPoolExecutor orderedMemoryAwareThreadPoolExecutor = new OrderedMemoryAwareThreadPoolExecutor(MAX_HANDLER_THREADS, 0L, 0L, 60L, TimeUnit.SECONDS, new ThreadFactory() { // from class: co.cask.cdap.internal.app.runtime.procedure.ProcedureProgramRunner.1
            private final ThreadGroup threadGroup = new ThreadGroup("procedure-thread");
            private final AtomicLong count = new AtomicLong(0);

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(this.threadGroup, runnable, String.format("procedure-executor-%d", Long.valueOf(this.count.getAndIncrement())));
                thread.setDaemon(true);
                return thread;
            }
        });
        orderedMemoryAwareThreadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        return new ExecutionHandler(orderedMemoryAwareThreadPoolExecutor);
    }

    private String getServiceName(Program program) {
        return String.format("procedure.%s.%s.%s", program.getNamespaceId(), program.getApplicationId(), program.getName());
    }
}
