package co.cask.cdap.internal.app.runtime.distributed;

import co.cask.cdap.api.app.ApplicationSpecification;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.app.guice.ClusterMode;
import co.cask.cdap.app.guice.DistributedProgramContainerModule;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.program.ProgramDescriptor;
import co.cask.cdap.app.program.Programs;
import co.cask.cdap.app.runtime.Arguments;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.app.runtime.ProgramRunner;
import co.cask.cdap.app.runtime.ProgramStateWriter;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.io.Locations;
import co.cask.cdap.common.logging.common.UncaughtExceptionHandler;
import co.cask.cdap.data.stream.StreamCoordinatorClient;
import co.cask.cdap.data2.datafabric.dataset.service.DatasetService;
import co.cask.cdap.internal.app.ApplicationSpecificationAdapter;
import co.cask.cdap.internal.app.program.StateChangeListener;
import co.cask.cdap.internal.app.runtime.AbstractListener;
import co.cask.cdap.internal.app.runtime.BasicArguments;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.app.runtime.ProgramRunners;
import co.cask.cdap.internal.app.runtime.SimpleProgramOptions;
import co.cask.cdap.internal.app.runtime.codec.ArgumentsCodec;
import co.cask.cdap.internal.app.runtime.codec.ProgramOptionsCodec;
import co.cask.cdap.internal.app.runtime.monitor.RuntimeMonitorServer;
import co.cask.cdap.logging.appender.LogAppenderInitializer;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.guice.MessagingServerRuntimeModule;
import co.cask.cdap.messaging.server.MessagingHttpService;
import co.cask.cdap.proto.id.ProgramRunId;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.io.Closeables;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.Service;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.util.Modules;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tephra.TransactionManager;
import org.apache.twill.api.Command;
import org.apache.twill.api.ElectionHandler;
import org.apache.twill.api.RunId;
import org.apache.twill.api.RuntimeSpecification;
import org.apache.twill.api.ServiceAnnouncer;
import org.apache.twill.api.TwillContext;
import org.apache.twill.api.TwillRunnable;
import org.apache.twill.api.TwillRunnableSpecification;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.discovery.ServiceDiscovered;
import org.apache.twill.internal.TwillRuntimeSpecification;
import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter;
import org.apache.twill.kafka.client.BrokerService;
import org.apache.twill.kafka.client.KafkaClientService;
import org.apache.twill.zookeeper.ZKClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/AbstractProgramTwillRunnable.class */
public abstract class AbstractProgramTwillRunnable<T extends ProgramRunner> implements TwillRunnable {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractProgramTwillRunnable.class);
    private static final Gson GSON = ApplicationSpecificationAdapter.addTypeAdapters(new GsonBuilder()).registerTypeAdapter(Arguments.class, new ArgumentsCodec()).registerTypeAdapter(ProgramOptions.class, new ProgramOptionsCodec()).registerTypeAdapter(org.apache.twill.internal.Arguments.class, new org.apache.twill.internal.json.ArgumentsCodec()).create();
    protected String name;
    private LogAppenderInitializer logAppenderInitializer;
    private ProgramOptions programOptions;
    private Deque<Service> coreServices;
    private Injector injector;
    private T programRunner;
    private Program program;
    private ProgramRunId programRunId;
    private TwillContext context;
    private CompletableFuture<ProgramController> controllerFuture;
    private CompletableFuture<ProgramController.State> programCompletion;
    private long maxStopSeconds;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: co.cask.cdap.internal.app.runtime.distributed.AbstractProgramTwillRunnable$3, reason: invalid class name */
    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/AbstractProgramTwillRunnable$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$co$cask$cdap$app$runtime$ProgramController$State;

        static {
            try {
                $SwitchMap$co$cask$cdap$app$guice$ClusterMode[ClusterMode.ON_PREMISE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$co$cask$cdap$app$guice$ClusterMode[ClusterMode.ISOLATED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$co$cask$cdap$app$runtime$ProgramController$State = new int[ProgramController.State.values().length];
            try {
                $SwitchMap$co$cask$cdap$app$runtime$ProgramController$State[ProgramController.State.ALIVE.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$co$cask$cdap$app$runtime$ProgramController$State[ProgramController.State.COMPLETED.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$co$cask$cdap$app$runtime$ProgramController$State[ProgramController.State.KILLED.ordinal()] = 3;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$co$cask$cdap$app$runtime$ProgramController$State[ProgramController.State.ERROR.ordinal()] = 4;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/AbstractProgramTwillRunnable$DirectExecutionTwillContext.class */
    private static final class DirectExecutionTwillContext implements TwillContext {
        private final String runnableName;
        private final TwillRuntimeSpecification runtimeSpec;
        private final String[] applicationArgs;
        private final String[] args;

        private DirectExecutionTwillContext(String str, TwillRuntimeSpecification twillRuntimeSpecification, org.apache.twill.internal.Arguments arguments) {
            this.runnableName = str;
            this.runtimeSpec = twillRuntimeSpecification;
            this.applicationArgs = (String[]) arguments.getArguments().toArray(new String[0]);
            this.args = (String[]) arguments.getRunnableArguments().get(str).toArray(new String[0]);
        }

        public RunId getRunId() {
            return this.runtimeSpec.getTwillAppRunId();
        }

        public RunId getApplicationRunId() {
            return this.runtimeSpec.getTwillAppRunId();
        }

        public int getInstanceCount() {
            return 1;
        }

        public InetAddress getHost() {
            try {
                return InetAddress.getLocalHost();
            } catch (UnknownHostException e) {
                return InetAddress.getLoopbackAddress();
            }
        }

        public String[] getArguments() {
            return this.args;
        }

        public String[] getApplicationArguments() {
            return this.applicationArgs;
        }

        public TwillRunnableSpecification getSpecification() {
            return ((RuntimeSpecification) this.runtimeSpec.getTwillSpecification().getRunnables().get(this.runnableName)).getRunnableSpecification();
        }

        public int getInstanceId() {
            return 0;
        }

        public int getVirtualCores() {
            return 0;
        }

        public int getMaxMemoryMB() {
            return 0;
        }

        public ServiceDiscovered discover(String str) {
            throw new UnsupportedOperationException();
        }

        public Cancellable electLeader(String str, ElectionHandler electionHandler) {
            throw new UnsupportedOperationException();
        }

        public Lock createLock(String str) {
            throw new UnsupportedOperationException();
        }

        public Cancellable announce(String str, int i) {
            throw new UnsupportedOperationException();
        }

        public Cancellable announce(String str, int i, byte[] bArr) {
            throw new UnsupportedOperationException();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static String getRunnableNameFromEnv() {
        String str = System.getenv("TWILL_RUNNABLE_NAME");
        if (str == null) {
            throw new IllegalArgumentException("Missing environment variable TWILL_RUNNABLE_NAME");
        }
        return str;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractProgramTwillRunnable(String str) {
        this.name = str;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void doMain() throws Exception {
        initialize(new DirectExecutionTwillContext(this.name, TwillRuntimeSpecificationAdapter.create().fromJson(new File("runtime.config.jar", "twillSpec.json")), (org.apache.twill.internal.Arguments) readJsonFile(new File("runtime.config.jar", "arguments.json"), org.apache.twill.internal.Arguments.class)));
        Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
        ProgramStateWriter programStateWriter = (ProgramStateWriter) this.injector.getInstance(ProgramStateWriter.class);
        this.controllerFuture.thenAcceptAsync(programController -> {
            programController.addListener(new StateChangeListener(programController.getProgramRunId(), null, programStateWriter), Threads.SAME_THREAD_EXECUTOR);
        }, runnable -> {
            new Thread(runnable).start();
        });
        run();
    }

    public TwillRunnableSpecification configure() {
        return TwillRunnableSpecification.Builder.with().setName(this.name).noConfigs().build();
    }

    public final void initialize(TwillContext twillContext) {
        this.context = twillContext;
        if (this.name == null) {
            this.name = twillContext.getSpecification().getName();
        }
        LOG.info("Initializing runnable: " + this.name);
        try {
            doInitialize(new File(twillContext.getApplicationArguments()[0]));
            LOG.info("Runnable initialized: {}", this.name);
        } catch (Throwable th) {
            LOG.error(th.getMessage(), th);
            throw Throwables.propagate(th);
        }
    }

    private void doInitialize(File file) throws Exception {
        this.controllerFuture = new CompletableFuture<>();
        this.programCompletion = new CompletableFuture<>();
        Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler());
        System.setSecurityManager(new ProgramContainerSecurityManager(System.getSecurityManager()));
        SLF4JBridgeHandler.install();
        this.programOptions = createProgramOptions(file);
        this.programRunId = this.programOptions.getProgramId().run(ProgramRunners.getRunId(this.programOptions));
        Arguments arguments = this.programOptions.getArguments();
        ClusterMode clusterMode = ProgramRunners.getClusterMode(this.programOptions);
        Configuration configuration = new Configuration();
        if (clusterMode == ClusterMode.ON_PREMISE) {
            configuration.clear();
            configuration.addResource(new File(arguments.getOption(ProgramOptionConstants.HADOOP_CONF_FILE)).toURI().toURL());
        }
        UserGroupInformation.setConfiguration(configuration);
        CConfiguration create = CConfiguration.create();
        create.clear();
        create.addResource(new File(arguments.getOption(ProgramOptionConstants.CDAP_CONF_FILE)).toURI().toURL());
        this.maxStopSeconds = create.getLong("app.program.max.stop.seconds");
        if (clusterMode == ClusterMode.ISOLATED) {
            create.set("master.services.bind.address", this.context.getHost().getCanonicalHostName());
        }
        this.injector = Guice.createInjector(new Module[]{createModule(create, configuration, this.programOptions, this.programRunId)});
        this.logAppenderInitializer = (LogAppenderInitializer) this.injector.getInstance(LogAppenderInitializer.class);
        this.coreServices = createCoreServices(this.injector, this.programOptions);
        this.programRunner = createProgramRunner(this.injector);
        this.program = Programs.create(create, this.programRunner, new ProgramDescriptor(this.programOptions.getProgramId(), (ApplicationSpecification) readJsonFile(new File(arguments.getOption(ProgramOptionConstants.APP_SPEC_FILE)), ApplicationSpecification.class)), Locations.toLocation(new File(arguments.getOption(ProgramOptionConstants.PROGRAM_JAR))), new File(arguments.getOption(ProgramOptionConstants.EXPANDED_PROGRAM_JAR)));
    }

    public void run() {
        startCoreServices();
        try {
            try {
                try {
                    LOG.info("Starting program run {}", this.programRunId);
                    final ProgramController run = this.programRunner.run(this.program, this.programOptions);
                    run.addListener(new AbstractListener() { // from class: co.cask.cdap.internal.app.runtime.distributed.AbstractProgramTwillRunnable.1
                        @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
                        public void init(ProgramController.State state, @Nullable Throwable th) {
                            switch (AnonymousClass3.$SwitchMap$co$cask$cdap$app$runtime$ProgramController$State[state.ordinal()]) {
                                case 1:
                                    alive();
                                    return;
                                case 2:
                                    completed();
                                    return;
                                case 3:
                                    killed();
                                    return;
                                case 4:
                                    error(th);
                                    return;
                                default:
                                    return;
                            }
                        }

                        @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
                        public void alive() {
                            AbstractProgramTwillRunnable.this.controllerFuture.complete(run);
                        }

                        @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
                        public void completed() {
                            AbstractProgramTwillRunnable.this.controllerFuture.complete(run);
                            AbstractProgramTwillRunnable.this.programCompletion.complete(ProgramController.State.COMPLETED);
                        }

                        @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
                        public void killed() {
                            AbstractProgramTwillRunnable.this.controllerFuture.complete(run);
                            AbstractProgramTwillRunnable.this.programCompletion.complete(ProgramController.State.KILLED);
                        }

                        @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
                        public void error(Throwable th) {
                            AbstractProgramTwillRunnable.this.controllerFuture.complete(run);
                            AbstractProgramTwillRunnable.this.programCompletion.completeExceptionally(th);
                        }
                    }, Threads.SAME_THREAD_EXECUTOR);
                    this.programCompletion.get();
                    LOG.info("Program run {} completed. Releasing resources.", this.programRunId);
                    Closeables.closeQuietly(this.program);
                    if (this.programRunner instanceof Closeable) {
                        Closeables.closeQuietly((Closeable) this.programRunner);
                    }
                    stopCoreServices();
                } catch (InterruptedException e) {
                    LOG.warn("Program {} interrupted.", this.name, e);
                    LOG.info("Program run {} completed. Releasing resources.", this.programRunId);
                    Closeables.closeQuietly(this.program);
                    if (this.programRunner instanceof Closeable) {
                        Closeables.closeQuietly((Closeable) this.programRunner);
                    }
                    stopCoreServices();
                }
            } catch (ExecutionException e2) {
                LOG.error("Program {} execution failed.", this.name, e2);
                throw Throwables.propagate(Throwables.getRootCause(e2));
            }
        } catch (Throwable th) {
            LOG.info("Program run {} completed. Releasing resources.", this.programRunId);
            Closeables.closeQuietly(this.program);
            if (this.programRunner instanceof Closeable) {
                Closeables.closeQuietly((Closeable) this.programRunner);
            }
            stopCoreServices();
            throw th;
        }
    }

    public void handleCommand(Command command) throws Exception {
        if (ProgramCommands.SUSPEND.equals(command)) {
            this.controllerFuture.get().suspend().get();
            return;
        }
        if (ProgramCommands.RESUME.equals(command)) {
            this.controllerFuture.get().resume().get();
        } else if (!ProgramOptionConstants.INSTANCES.equals(command.getCommand())) {
            LOG.warn("Ignore unsupported command: " + command);
        } else {
            this.controllerFuture.get().command(ProgramOptionConstants.INSTANCES, Integer.valueOf(Integer.parseInt((String) command.getOptions().get("count")))).get();
        }
    }

    public void stop() {
        try {
            CompletableFuture<ProgramController.State> completableFuture = this.programCompletion;
            if (completableFuture == null || completableFuture.isDone()) {
                return;
            }
            ProgramController programController = this.controllerFuture.get(5L, TimeUnit.SECONDS);
            LOG.info("Stopping runnable: {}.", this.name);
            programController.stop().get(this.maxStopSeconds == 0 ? 60L : this.maxStopSeconds, TimeUnit.SECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw Throwables.propagate(e);
        }
    }

    public void destroy() {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, String> getExtraSystemArguments() {
        HashMap hashMap = new HashMap();
        hashMap.put(ProgramOptionConstants.INSTANCE_ID, this.context == null ? "0" : Integer.toString(this.context.getInstanceId()));
        hashMap.put(ProgramOptionConstants.INSTANCES, this.context == null ? "1" : Integer.toString(this.context.getInstanceCount()));
        hashMap.put(ProgramOptionConstants.TWILL_RUN_ID, this.context.getApplicationRunId().getId());
        hashMap.put(ProgramOptionConstants.HOST, this.context.getHost().getCanonicalHostName());
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Module createModule(CConfiguration cConfiguration, Configuration configuration, ProgramOptions programOptions, ProgramRunId programRunId) {
        Module distributedProgramContainerModule = new DistributedProgramContainerModule(cConfiguration, configuration, programRunId, programOptions.getArguments(), getServiceAnnouncer());
        if (ProgramRunners.getClusterMode(programOptions) == ClusterMode.ISOLATED) {
            distributedProgramContainerModule = Modules.override(new Module[]{distributedProgramContainerModule}).with(new Module[]{new MessagingServerRuntimeModule().getStandaloneModules(), new AbstractModule() { // from class: co.cask.cdap.internal.app.runtime.distributed.AbstractProgramTwillRunnable.2
                protected void configure() {
                    bind(Cancellable.class).toInstance(() -> {
                        AbstractProgramTwillRunnable.this.stop();
                    });
                }
            }});
        }
        return distributedProgramContainerModule;
    }

    @Nullable
    protected ServiceAnnouncer getServiceAnnouncer() {
        return this.context;
    }

    protected Arguments resolveScope(Arguments arguments) {
        return arguments;
    }

    protected T createProgramRunner(Injector injector) {
        Type type = TypeToken.of(getClass()).getSupertype(AbstractProgramTwillRunnable.class).getType();
        Preconditions.checkState(type instanceof ParameterizedType, "Invalid class %s. Expected to be a ParameterizedType.", new Object[]{getClass()});
        Type type2 = ((ParameterizedType) type).getActualTypeArguments()[0];
        Preconditions.checkState(type2 instanceof Class, "ProgramRunner type is not a class: %s", new Object[]{type2});
        return (T) injector.getInstance((Class) type2);
    }

    private ProgramOptions createProgramOptions(File file) throws IOException {
        ProgramOptions programOptions = (ProgramOptions) readJsonFile(file, ProgramOptions.class);
        HashMap hashMap = new HashMap(programOptions.getArguments().asMap());
        hashMap.putAll(getExtraSystemArguments());
        return new SimpleProgramOptions(programOptions.getProgramId(), new BasicArguments(hashMap), resolveScope(programOptions.getUserArguments()), programOptions.isDebug());
    }

    private <U> U readJsonFile(File file, Class<U> cls) throws IOException {
        BufferedReader newBufferedReader = Files.newBufferedReader(file.toPath(), StandardCharsets.UTF_8);
        Throwable th = null;
        try {
            try {
                U u = (U) GSON.fromJson(newBufferedReader, cls);
                if (newBufferedReader != null) {
                    if (0 != 0) {
                        try {
                            newBufferedReader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        newBufferedReader.close();
                    }
                }
                return u;
            } finally {
            }
        } catch (Throwable th3) {
            if (newBufferedReader != null) {
                if (th != null) {
                    try {
                        newBufferedReader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newBufferedReader.close();
                }
            }
            throw th3;
        }
    }

    private Deque<Service> createCoreServices(Injector injector, ProgramOptions programOptions) {
        LinkedList linkedList = new LinkedList();
        MetricsCollectionService metricsCollectionService = (MetricsCollectionService) injector.getInstance(MetricsCollectionService.class);
        linkedList.add(metricsCollectionService);
        linkedList.add(injector.getInstance(ZKClientService.class));
        switch (ProgramRunners.getClusterMode(programOptions)) {
            case ON_PREMISE:
                addOnPremiseServices(injector, programOptions, metricsCollectionService, linkedList);
                break;
            case ISOLATED:
                addIsolatedServices(injector, linkedList);
                break;
        }
        return linkedList;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void addOnPremiseServices(Injector injector, ProgramOptions programOptions, MetricsCollectionService metricsCollectionService, Collection<Service> collection) {
        collection.add(injector.getInstance(KafkaClientService.class));
        collection.add(injector.getInstance(BrokerService.class));
        collection.add(injector.getInstance(StreamCoordinatorClient.class));
        collection.add(new ProgramRunnableResourceReporter(programOptions.getProgramId(), metricsCollectionService, this.context));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void addIsolatedServices(Injector injector, Collection<Service> collection) {
        Service service = (MessagingService) injector.getInstance(MessagingService.class);
        if (service instanceof Service) {
            collection.add(service);
        }
        collection.add(injector.getInstance(TransactionManager.class));
        collection.add(injector.getInstance(MessagingHttpService.class));
        collection.add(injector.getInstance(RuntimeMonitorServer.class));
        collection.add(injector.getInstance(DatasetService.class));
    }

    private void startCoreServices() {
        this.logAppenderInitializer.initialize();
        try {
            Iterator<Service> it = this.coreServices.iterator();
            while (it.hasNext()) {
                it.next().startAndWait();
            }
        } catch (Exception e) {
            this.logAppenderInitializer.close();
            throw e;
        }
    }

    private void stopCoreServices() {
        Deque<Service> deque = this.coreServices;
        deque.getClass();
        Iterable<Service> iterable = deque::descendingIterator;
        for (Service service : iterable) {
            try {
                service.stopAndWait();
            } catch (Exception e) {
                LOG.warn("Exception raised when stopping service {} during program termination.", service, e);
            }
        }
        this.logAppenderInitializer.close();
    }
}
