package org.apache.flume.source;

import com.google.common.base.Preconditions;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/source/ExecSource.class */
public class ExecSource extends AbstractSource implements EventDrivenSource, Configurable {
    private static final Logger logger = LoggerFactory.getLogger(ExecSource.class);
    private String command;
    private CounterGroup counterGroup;
    private ExecutorService executor;
    private Future<?> runnerFuture;
    private long restartThrottle;
    private boolean restart;
    private boolean logStderr;
    private ExecRunnable runner;

    /* loaded from: input_file:org/apache/flume/source/ExecSource$ExecRunnable.class */
    private static class ExecRunnable implements Runnable {
        private String command;
        private ChannelProcessor channelProcessor;
        private CounterGroup counterGroup;
        private volatile boolean restart;
        private long restartThrottle;
        private boolean logStderr;

        public ExecRunnable(String str, ChannelProcessor channelProcessor, CounterGroup counterGroup, boolean z, long j, boolean z2) {
            this.command = str;
            this.channelProcessor = channelProcessor;
            this.counterGroup = counterGroup;
            this.restartThrottle = j;
            this.restart = z;
            this.logStderr = z2;
        }

        @Override // java.lang.Runnable
        public void run() {
            do {
                String str = "unknown";
                BufferedReader bufferedReader = null;
                Process process = null;
                try {
                    try {
                        process = new ProcessBuilder(this.command.split("\\s+")).start();
                        bufferedReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
                        StderrReader stderrReader = new StderrReader(new BufferedReader(new InputStreamReader(process.getErrorStream())), this.logStderr);
                        stderrReader.setName("StderrReader-[" + this.command + "]");
                        stderrReader.setDaemon(true);
                        stderrReader.start();
                        while (true) {
                            String readLine = bufferedReader.readLine();
                            if (readLine == null) {
                                break;
                            }
                            this.counterGroup.incrementAndGet("exec.lines.read");
                            this.channelProcessor.processEvent(EventBuilder.withBody(readLine.getBytes()));
                        }
                        if (bufferedReader != null) {
                            try {
                                bufferedReader.close();
                            } catch (IOException e) {
                                ExecSource.logger.error("Failed to close reader for exec source", e);
                            }
                        }
                        if (process != null) {
                            process.destroy();
                            try {
                                str = String.valueOf(process.waitFor());
                            } catch (InterruptedException e2) {
                                Thread.currentThread().interrupt();
                            }
                        }
                    } catch (Exception e3) {
                        ExecSource.logger.error("Failed while running command: " + this.command, e3);
                        if (e3 instanceof InterruptedException) {
                            Thread.currentThread().interrupt();
                        }
                        if (bufferedReader != null) {
                            try {
                                bufferedReader.close();
                            } catch (IOException e4) {
                                ExecSource.logger.error("Failed to close reader for exec source", e4);
                            }
                        }
                        if (process != null) {
                            process.destroy();
                            try {
                                str = String.valueOf(process.waitFor());
                            } catch (InterruptedException e5) {
                                Thread.currentThread().interrupt();
                            }
                        }
                    }
                    if (this.restart) {
                        ExecSource.logger.info("Restarting in {}ms, exit code {}", Long.valueOf(this.restartThrottle), str);
                        try {
                            Thread.sleep(this.restartThrottle);
                        } catch (InterruptedException e6) {
                            Thread.currentThread().interrupt();
                        }
                    }
                } catch (Throwable th) {
                    if (bufferedReader != null) {
                        try {
                            bufferedReader.close();
                        } catch (IOException e7) {
                            ExecSource.logger.error("Failed to close reader for exec source", e7);
                        }
                    }
                    if (process != null) {
                        process.destroy();
                        try {
                            String.valueOf(process.waitFor());
                        } catch (InterruptedException e8) {
                            Thread.currentThread().interrupt();
                        }
                    }
                    throw th;
                }
            } while (this.restart);
        }

        public void setRestart(boolean z) {
            this.restart = z;
        }
    }

    /* loaded from: input_file:org/apache/flume/source/ExecSource$StderrReader.class */
    private static class StderrReader extends Thread {
        private BufferedReader input;
        private boolean logStderr;

        protected StderrReader(BufferedReader bufferedReader, boolean z) {
            this.input = bufferedReader;
            this.logStderr = z;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                int i = 0;
                while (true) {
                    try {
                        String readLine = this.input.readLine();
                        if (readLine == null) {
                            break;
                        } else if (this.logStderr) {
                            i++;
                            ExecSource.logger.info("StderrLogger[{}] = '{}'", Integer.valueOf(i), readLine);
                        }
                    } catch (IOException e) {
                        ExecSource.logger.info("StderrLogger exiting", e);
                        try {
                            if (this.input != null) {
                                this.input.close();
                            }
                            return;
                        } catch (IOException e2) {
                            ExecSource.logger.error("Failed to close stderr reader for exec source", e2);
                            return;
                        }
                    }
                }
                try {
                    if (this.input != null) {
                        this.input.close();
                    }
                } catch (IOException e3) {
                    ExecSource.logger.error("Failed to close stderr reader for exec source", e3);
                }
            } catch (Throwable th) {
                try {
                    if (this.input != null) {
                        this.input.close();
                    }
                } catch (IOException e4) {
                    ExecSource.logger.error("Failed to close stderr reader for exec source", e4);
                }
                throw th;
            }
        }
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void start() {
        logger.info("Exec source starting with command:{}", this.command);
        this.executor = Executors.newSingleThreadExecutor();
        this.counterGroup = new CounterGroup();
        this.runner = new ExecRunnable(this.command, getChannelProcessor(), this.counterGroup, this.restart, this.restartThrottle, this.logStderr);
        this.runnerFuture = this.executor.submit(this.runner);
        super.start();
        logger.debug("Exec source started");
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void stop() {
        logger.info("Stopping exec source with command:{}", this.command);
        if (this.runner != null) {
            this.runner.setRestart(false);
        }
        if (this.runnerFuture != null) {
            logger.debug("Stopping exec runner");
            this.runnerFuture.cancel(true);
            logger.debug("Exec runner stopped");
        }
        this.executor.shutdown();
        while (!this.executor.isTerminated()) {
            logger.debug("Waiting for exec executor service to stop");
            try {
                this.executor.awaitTermination(500L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                logger.debug("Interrupted while waiting for exec executor service to stop. Just exiting.");
                Thread.currentThread().interrupt();
            }
        }
        super.stop();
        logger.debug("Exec source with command:{} stopped. Metrics:{}", this.command, this.counterGroup);
    }

    @Override // org.apache.flume.conf.Configurable
    public void configure(Context context) {
        this.command = context.getString("command");
        Preconditions.checkState(this.command != null, "The parameter command must be specified");
        this.restartThrottle = context.getLong(ExecSourceConfigurationConstants.CONFIG_RESTART_THROTTLE, Long.valueOf(ExecSourceConfigurationConstants.DEFAULT_RESTART_THROTTLE)).longValue();
        this.restart = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_RESTART, false).booleanValue();
        this.logStderr = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_LOG_STDERR, false).booleanValue();
    }
}
