package org.apache.storm.spout;

import clojure.lang.RT;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.storm.Config;
import org.apache.storm.generated.ShellComponent;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.api.rpc.IShellMetric;
import org.apache.storm.multilang.ShellMsg;
import org.apache.storm.multilang.SpoutMsg;
import org.apache.storm.shade.com.google.common.util.concurrent.MoreExecutors;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.ShellLogHandler;
import org.apache.storm.utils.ShellProcess;
import org.apache.storm.utils.ShellUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/spout/ShellSpout.class */
public class ShellSpout implements ISpout {
    public static final Logger LOG = LoggerFactory.getLogger((Class<?>) ShellSpout.class);
    private static final long serialVersionUID = 5982357019665454L;
    private SpoutOutputCollector _collector;
    private String[] _command;
    private Map<String, String> env;
    private ShellLogHandler _logHandler;
    private ShellProcess _process;
    private volatile boolean _running;
    private volatile RuntimeException _exception;
    private TopologyContext _context;
    private SpoutMsg _spoutMsg;
    private int workerTimeoutMills;
    private ScheduledExecutorService heartBeatExecutorService;
    private AtomicLong lastHeartbeatTimestamp;
    private AtomicBoolean waitingOnSubprocess;
    private boolean changeDirectory;

    /* loaded from: input_file:org/apache/storm/spout/ShellSpout$SpoutHeartbeatTimerTask.class */
    private class SpoutHeartbeatTimerTask extends TimerTask {
        private ShellSpout spout;

        public SpoutHeartbeatTimerTask(ShellSpout shellSpout) {
            this.spout = shellSpout;
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            long lastHeartbeat = ShellSpout.this.getLastHeartbeat();
            long currentTimeMillis = System.currentTimeMillis();
            boolean z = ShellSpout.this.waitingOnSubprocess.get();
            ShellSpout.LOG.debug("last heartbeat : {}, waiting subprocess now : {}, worker timeout (ms) : {}", Long.valueOf(lastHeartbeat), Boolean.valueOf(z), Integer.valueOf(ShellSpout.this.workerTimeoutMills));
            if (!z || currentTimeMillis - lastHeartbeat <= ShellSpout.this.workerTimeoutMills) {
                return;
            }
            this.spout.die(new RuntimeException("subprocess heartbeat timeout"));
        }
    }

    public ShellSpout(ShellComponent shellComponent) {
        this(shellComponent.get_execution_command(), shellComponent.get_script());
    }

    public ShellSpout(String... strArr) {
        this.env = new HashMap();
        this._running = true;
        this.lastHeartbeatTimestamp = new AtomicLong();
        this.waitingOnSubprocess = new AtomicBoolean(false);
        this.changeDirectory = true;
        this._command = strArr;
    }

    public ShellSpout setEnv(Map<String, String> map) {
        this.env = map;
        return this;
    }

    public boolean shouldChangeChildCWD() {
        return this.changeDirectory;
    }

    public void changeChildCWD(boolean z) {
        this.changeDirectory = z;
    }

    @Override // org.apache.storm.spout.ISpout
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this._collector = spoutOutputCollector;
        this._context = topologyContext;
        if (map.containsKey(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS)) {
            this.workerTimeoutMills = 1000 * RT.intCast(map.get(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS));
        } else {
            this.workerTimeoutMills = 1000 * RT.intCast(map.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
        }
        this._process = new ShellProcess(this._command);
        if (!this.env.isEmpty()) {
            this._process.setEnv(this.env);
        }
        LOG.info("Launched subprocess with pid " + this._process.launch(map, topologyContext, this.changeDirectory));
        this._logHandler = ShellUtils.getLogHandler(map);
        this._logHandler.setUpContext(ShellSpout.class, this._process, this._context);
        this.heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
    }

    @Override // org.apache.storm.spout.ISpout
    public void close() {
        this.heartBeatExecutorService.shutdownNow();
        this._process.destroy();
        this._running = false;
    }

    @Override // org.apache.storm.spout.ISpout
    public void nextTuple() {
        sendSyncCommand("next", "");
    }

    @Override // org.apache.storm.spout.ISpout
    public void ack(Object obj) {
        sendSyncCommand("ack", obj);
    }

    @Override // org.apache.storm.spout.ISpout
    public void fail(Object obj) {
        sendSyncCommand("fail", obj);
    }

    private void sendSyncCommand(String str, Object obj) {
        if (this._exception != null) {
            throw this._exception;
        }
        if (this._spoutMsg == null) {
            this._spoutMsg = new SpoutMsg();
        }
        this._spoutMsg.setCommand(str);
        this._spoutMsg.setId(obj);
        querySubprocess();
    }

    private void handleMetrics(ShellMsg shellMsg) {
        String metricName = shellMsg.getMetricName();
        if (metricName.isEmpty()) {
            throw new RuntimeException("Receive Metrics name is empty");
        }
        IMetric registeredMetricByName = this._context.getRegisteredMetricByName(metricName);
        if (registeredMetricByName == null) {
            throw new RuntimeException("Could not find metric by name[" + metricName + "] ");
        }
        if (!(registeredMetricByName instanceof IShellMetric)) {
            throw new RuntimeException("Metric[" + metricName + "] is not IShellMetric, can not call by RPC");
        }
        try {
            ((IShellMetric) registeredMetricByName).updateMetricFromRPC(shellMsg.getMetricParams());
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    private void querySubprocess() {
        try {
            try {
                markWaitingSubprocess();
                this._process.writeSpoutMsg(this._spoutMsg);
                while (true) {
                    ShellMsg readShellMsg = this._process.readShellMsg();
                    String command = readShellMsg.getCommand();
                    if (command == null) {
                        throw new IllegalArgumentException("Command not found in spout message: " + readShellMsg);
                    }
                    setHeartbeat();
                    if (command.equals("sync")) {
                        return;
                    }
                    if (command.equals("log")) {
                        this._logHandler.log(readShellMsg);
                    } else if (command.equals("error")) {
                        handleError(readShellMsg.getMsg());
                    } else if (command.equals("emit")) {
                        String stream = readShellMsg.getStream();
                        Long valueOf = Long.valueOf(readShellMsg.getTask());
                        List<Object> tuple = readShellMsg.getTuple();
                        Object id = readShellMsg.getId();
                        if (valueOf.longValue() == 0) {
                            List<Integer> emit = this._collector.emit(stream, tuple, id);
                            if (readShellMsg.areTaskIdsNeeded()) {
                                this._process.writeTaskIds(emit);
                            }
                        } else {
                            this._collector.emitDirect((int) valueOf.longValue(), stream, tuple, id);
                        }
                    } else {
                        if (!command.equals("metrics")) {
                            throw new RuntimeException("Unknown command received: " + command);
                        }
                        handleMetrics(readShellMsg);
                    }
                }
            } catch (Exception e) {
                throw new RuntimeException(this._process.getProcessInfoString() + this._process.getProcessTerminationInfoString(), e);
            }
        } finally {
            completedWaitingSubprocess();
        }
    }

    private void handleError(String str) {
        this._collector.reportError(new Exception("Shell Process Exception: " + str));
    }

    @Override // org.apache.storm.spout.ISpout
    public void activate() {
        LOG.info("Start checking heartbeat...");
        setHeartbeat();
        if (this.heartBeatExecutorService.isShutdown()) {
            this.heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
        }
        this.heartBeatExecutorService.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1L, 1L, TimeUnit.SECONDS);
        sendSyncCommand("activate", "");
    }

    @Override // org.apache.storm.spout.ISpout
    public void deactivate() {
        sendSyncCommand("deactivate", "");
        this.heartBeatExecutorService.shutdownNow();
    }

    private void setHeartbeat() {
        this.lastHeartbeatTimestamp.set(System.currentTimeMillis());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long getLastHeartbeat() {
        return this.lastHeartbeatTimestamp.get();
    }

    private void markWaitingSubprocess() {
        setHeartbeat();
        this.waitingOnSubprocess.compareAndSet(false, true);
    }

    private void completedWaitingSubprocess() {
        this.waitingOnSubprocess.compareAndSet(true, false);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void die(Throwable th) {
        String str = this._process.getProcessInfoString() + this._process.getProcessTerminationInfoString();
        this._exception = new RuntimeException(str, th);
        LOG.error(String.format("Halting process: ShellSpout died. Command: %s, ProcessInfo %s", Arrays.toString(this._command), str), th);
        this._collector.reportError(th);
        if (this._running || (th instanceof Error)) {
            System.exit(11);
        }
    }
}
