package backtype.storm.spout;

import backtype.storm.generated.ShellComponent;
import backtype.storm.metric.api.IMetric;
import backtype.storm.metric.api.rpc.IShellMetric;
import backtype.storm.multilang.ShellMsg;
import backtype.storm.multilang.SpoutMsg;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.ShellProcess;
import clojure.lang.RT;
import java.util.List;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.storm.guava.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:backtype/storm/spout/ShellSpout.class */
public class ShellSpout implements ISpout {
    public static Logger LOG = LoggerFactory.getLogger(ShellSpout.class);
    private SpoutOutputCollector _collector;
    private String[] _command;
    private ShellProcess _process;
    private TopologyContext _context;
    private SpoutMsg _spoutMsg;
    private int workerTimeoutMills;
    private ScheduledExecutorService heartBeatExecutorService;
    private AtomicLong lastHeartbeatTimestamp;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: backtype.storm.spout.ShellSpout$1, reason: invalid class name */
    /* loaded from: input_file:backtype/storm/spout/ShellSpout$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$backtype$storm$multilang$ShellMsg$ShellLogLevel = new int[ShellMsg.ShellLogLevel.values().length];

        static {
            try {
                $SwitchMap$backtype$storm$multilang$ShellMsg$ShellLogLevel[ShellMsg.ShellLogLevel.TRACE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$backtype$storm$multilang$ShellMsg$ShellLogLevel[ShellMsg.ShellLogLevel.DEBUG.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$backtype$storm$multilang$ShellMsg$ShellLogLevel[ShellMsg.ShellLogLevel.INFO.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$backtype$storm$multilang$ShellMsg$ShellLogLevel[ShellMsg.ShellLogLevel.WARN.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$backtype$storm$multilang$ShellMsg$ShellLogLevel[ShellMsg.ShellLogLevel.ERROR.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* loaded from: input_file:backtype/storm/spout/ShellSpout$SpoutHeartbeatTimerTask.class */
    private class SpoutHeartbeatTimerTask extends TimerTask {
        private ShellSpout spout;

        public SpoutHeartbeatTimerTask(ShellSpout shellSpout) {
            this.spout = shellSpout;
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            long currentTimeMillis = System.currentTimeMillis();
            long lastHeartbeat = ShellSpout.this.getLastHeartbeat();
            ShellSpout.LOG.debug("current time : {}, last heartbeat : {}, worker timeout (ms) : {}", new Object[]{Long.valueOf(currentTimeMillis), Long.valueOf(lastHeartbeat), Integer.valueOf(ShellSpout.this.workerTimeoutMills)});
            if (currentTimeMillis - lastHeartbeat > ShellSpout.this.workerTimeoutMills) {
                this.spout.die(new RuntimeException("subprocess heartbeat timeout"));
            }
        }
    }

    public ShellSpout(ShellComponent shellComponent) {
        this(shellComponent.get_execution_command(), shellComponent.get_script());
    }

    public ShellSpout(String... strArr) {
        this.lastHeartbeatTimestamp = new AtomicLong();
        this._command = strArr;
    }

    @Override // backtype.storm.spout.ISpout
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this._collector = spoutOutputCollector;
        this._context = topologyContext;
        this.workerTimeoutMills = 1000 * RT.intCast(map.get("supervisor.worker.timeout.secs"));
        this._process = new ShellProcess(this._command);
        LOG.info("Launched subprocess with pid " + this._process.launch(map, topologyContext));
        this.heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
    }

    @Override // backtype.storm.spout.ISpout
    public void close() {
        this.heartBeatExecutorService.shutdownNow();
        this._process.destroy();
    }

    @Override // backtype.storm.spout.ISpout
    public void nextTuple() {
        if (this._spoutMsg == null) {
            this._spoutMsg = new SpoutMsg();
        }
        this._spoutMsg.setCommand("next");
        this._spoutMsg.setId("");
        querySubprocess();
    }

    @Override // backtype.storm.spout.ISpout
    public void ack(Object obj) {
        if (this._spoutMsg == null) {
            this._spoutMsg = new SpoutMsg();
        }
        this._spoutMsg.setCommand("ack");
        this._spoutMsg.setId(obj);
        querySubprocess();
    }

    @Override // backtype.storm.spout.ISpout
    public void fail(Object obj) {
        if (this._spoutMsg == null) {
            this._spoutMsg = new SpoutMsg();
        }
        this._spoutMsg.setCommand("fail");
        this._spoutMsg.setId(obj);
        querySubprocess();
    }

    private void handleMetrics(ShellMsg shellMsg) {
        String metricName = shellMsg.getMetricName();
        if (metricName.isEmpty()) {
            throw new RuntimeException("Receive Metrics name is empty");
        }
        IMetric registeredMetricByName = this._context.getRegisteredMetricByName(metricName);
        if (registeredMetricByName == null) {
            throw new RuntimeException("Could not find metric by name[" + metricName + "] ");
        }
        if (!(registeredMetricByName instanceof IShellMetric)) {
            throw new RuntimeException("Metric[" + metricName + "] is not IShellMetric, can not call by RPC");
        }
        try {
            ((IShellMetric) registeredMetricByName).updateMetricFromRPC(shellMsg.getMetricParams());
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    private void querySubprocess() {
        try {
            this._process.writeSpoutMsg(this._spoutMsg);
            while (true) {
                ShellMsg readShellMsg = this._process.readShellMsg();
                String command = readShellMsg.getCommand();
                if (command == null) {
                    throw new IllegalArgumentException("Command not found in spout message: " + readShellMsg);
                }
                setHeartbeat();
                if (command.equals("sync")) {
                    return;
                }
                if (command.equals("log")) {
                    handleLog(readShellMsg);
                } else if (command.equals("emit")) {
                    String stream = readShellMsg.getStream();
                    Long valueOf = Long.valueOf(readShellMsg.getTask());
                    List<Object> tuple = readShellMsg.getTuple();
                    Object id = readShellMsg.getId();
                    if (valueOf.longValue() == 0) {
                        List<Integer> emit = this._collector.emit(stream, tuple, id);
                        if (readShellMsg.areTaskIdsNeeded()) {
                            this._process.writeTaskIds(emit);
                        }
                    } else {
                        this._collector.emitDirect((int) valueOf.longValue(), stream, tuple, id);
                    }
                } else {
                    if (!command.equals("metrics")) {
                        throw new RuntimeException("Unknown command received: " + command);
                    }
                    handleMetrics(readShellMsg);
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(this._process.getProcessInfoString() + this._process.getProcessTerminationInfoString(), e);
        }
    }

    private void handleLog(ShellMsg shellMsg) {
        String str = "ShellLog " + this._process.getProcessInfoString() + " " + shellMsg.getMsg();
        switch (AnonymousClass1.$SwitchMap$backtype$storm$multilang$ShellMsg$ShellLogLevel[shellMsg.getLogLevel().ordinal()]) {
            case 1:
                LOG.trace(str);
                return;
            case 2:
                LOG.debug(str);
                return;
            case 3:
                LOG.info(str);
                return;
            case 4:
                LOG.warn(str);
                return;
            case 5:
                LOG.error(str);
                return;
            default:
                LOG.info(str);
                return;
        }
    }

    @Override // backtype.storm.spout.ISpout
    public void activate() {
        LOG.info("Start checking heartbeat...");
        setHeartbeat();
        this.heartBeatExecutorService.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1L, 1L, TimeUnit.SECONDS);
    }

    @Override // backtype.storm.spout.ISpout
    public void deactivate() {
        this.heartBeatExecutorService.shutdownNow();
    }

    private void setHeartbeat() {
        this.lastHeartbeatTimestamp.set(System.currentTimeMillis());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long getLastHeartbeat() {
        return this.lastHeartbeatTimestamp.get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void die(Throwable th) {
        this.heartBeatExecutorService.shutdownNow();
        LOG.error("Halting process: ShellSpout died.", th);
        this._collector.reportError(th);
        this._process.destroy();
        System.exit(11);
    }
}
