package org.apache.nifi.minifi.bootstrap;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import org.apache.nifi.bootstrap.BootstrapCommunicator;
import org.apache.nifi.minifi.MiNiFiServer;
import org.apache.nifi.minifi.status.StatusRequestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/minifi/bootstrap/BootstrapListener.class */
public class BootstrapListener implements BootstrapCommunicator {
    private static final Logger logger = LoggerFactory.getLogger(BootstrapListener.class);
    private static final String RELOAD = "RELOAD";
    private static final String SHUTDOWN = "SHUTDOWN";
    private static final String STARTED = "STARTED";
    private static final int LISTENER_EXECUTOR_THREAD_COUNT = 2;
    private final MiNiFiServer minifiServer;
    private final int bootstrapPort;
    private Listener listener;
    private final Map<String, BiConsumer<String[], OutputStream>> messageHandlers = new HashMap();
    private final String secretKey = UUID.randomUUID().toString();
    private final BootstrapRequestReader bootstrapRequestReader = new BootstrapRequestReader(this.secretKey);
    private final ObjectMapper objectMapper = new ObjectMapper();

    /* loaded from: input_file:org/apache/nifi/minifi/bootstrap/BootstrapListener$Listener.class */
    private class Listener implements Runnable {
        private final ServerSocket serverSocket;
        private volatile boolean stopped = false;
        private final ExecutorService executor = Executors.newFixedThreadPool(BootstrapListener.LISTENER_EXECUTOR_THREAD_COUNT);

        public Listener(ServerSocket serverSocket) {
            this.serverSocket = serverSocket;
        }

        public void stop() {
            this.stopped = true;
            this.executor.shutdownNow();
            try {
                this.serverSocket.close();
            } catch (IOException e) {
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!this.stopped) {
                try {
                    try {
                        BootstrapListener.logger.debug("Listening for Bootstrap Requests");
                        Socket accept = this.serverSocket.accept();
                        BootstrapListener.logger.debug("Received connection from Bootstrap");
                        accept.setSoTimeout(5000);
                        this.executor.submit(() -> {
                            handleBootstrapRequest(accept);
                        });
                    } catch (SocketTimeoutException e) {
                        if (this.stopped) {
                            return;
                        }
                    } catch (IOException e2) {
                        if (!this.stopped) {
                            throw e2;
                            break;
                        }
                        return;
                    }
                } catch (Throwable th) {
                    BootstrapListener.logger.error("Failed to process request from Bootstrap due to " + String.valueOf(th), th);
                }
            }
        }

        private void handleBootstrapRequest(Socket socket) {
            try {
                try {
                    BootstrapRequest readRequest = BootstrapListener.this.bootstrapRequestReader.readRequest(socket.getInputStream());
                    String requestType = readRequest.getRequestType();
                    BiConsumer<String[], OutputStream> biConsumer = BootstrapListener.this.messageHandlers.get(requestType);
                    if (biConsumer == null) {
                        BootstrapListener.logger.warn("There is no handler defined for the {}", requestType);
                    } else {
                        biConsumer.accept(readRequest.getArgs(), socket.getOutputStream());
                    }
                } catch (Throwable th) {
                    BootstrapListener.logger.error("Failed to process request from Bootstrap due to " + String.valueOf(th), th);
                    try {
                        socket.close();
                    } catch (IOException e) {
                        BootstrapListener.logger.warn("Failed to close socket to Bootstrap due to {}", e.toString());
                    }
                }
            } finally {
                try {
                    socket.close();
                } catch (IOException e2) {
                    BootstrapListener.logger.warn("Failed to close socket to Bootstrap due to {}", e2.toString());
                }
            }
        }
    }

    public BootstrapListener(MiNiFiServer miNiFiServer, int i) {
        this.minifiServer = miNiFiServer;
        this.bootstrapPort = i;
        this.objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
        registerHandlers();
    }

    public void start() throws IOException {
        logger.debug("Starting Bootstrap Listener to communicate with Bootstrap Port {}", Integer.valueOf(this.bootstrapPort));
        ServerSocket serverSocket = new ServerSocket();
        serverSocket.bind(new InetSocketAddress("localhost", 0));
        serverSocket.setSoTimeout(2000);
        int localPort = serverSocket.getLocalPort();
        logger.info("Started Bootstrap Listener, Listening for incoming requests on port {}", Integer.valueOf(localPort));
        this.listener = new Listener(serverSocket);
        Thread thread = new Thread(this.listener);
        thread.setDaemon(true);
        thread.setName("Listen to Bootstrap");
        thread.start();
        logger.debug("Notifying Bootstrap that local port is {}", Integer.valueOf(localPort));
        sendCommand("PORT", new String[]{String.valueOf(localPort), this.secretKey});
    }

    public void reload() throws IOException {
        if (this.listener != null) {
            this.listener.stop();
        }
        sendCommand(RELOAD, new String[0]);
    }

    public void stop() throws IOException {
        if (this.listener != null) {
            this.listener.stop();
        }
        sendCommand(SHUTDOWN, new String[0]);
    }

    public void sendStartedStatus(boolean z) throws IOException {
        logger.debug("Notifying Bootstrap that the status of starting MiNiFi is {}", Boolean.valueOf(z));
        sendCommand(STARTED, new String[]{String.valueOf(z)});
    }

    public void sendCommand(String str, String[] strArr) throws IOException {
        Socket socket = new Socket();
        try {
            socket.setSoTimeout(60000);
            socket.connect(new InetSocketAddress("localhost", this.bootstrapPort));
            StringBuilder sb = new StringBuilder(str);
            Arrays.stream(strArr).forEach(str2 -> {
                sb.append(" ").append(str2);
            });
            sb.append("\n");
            String sb2 = sb.toString();
            logger.debug("Sending command to Bootstrap: {}", sb2);
            OutputStream outputStream = socket.getOutputStream();
            outputStream.write(sb2.getBytes(StandardCharsets.UTF_8));
            outputStream.flush();
            logger.debug("Awaiting response from Bootstrap...");
            if ("OK".equals(new BufferedReader(new InputStreamReader(socket.getInputStream())).readLine())) {
                logger.info("Successfully initiated communication with Bootstrap");
            } else {
                logger.error("Failed to communicate with Bootstrap. Bootstrap may be unable to issue or receive commands from MiNiFi");
            }
            socket.close();
        } catch (Throwable th) {
            try {
                socket.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    public void registerMessageHandler(String str, BiConsumer<String[], OutputStream> biConsumer) {
        this.messageHandlers.putIfAbsent(str, biConsumer);
    }

    private void registerHandlers() {
        this.messageHandlers.putIfAbsent("PING", (strArr, outputStream) -> {
            logger.debug("Received PING request from Bootstrap; responding");
            echoRequestCmd("PING", outputStream);
            logger.debug("Responded to PING request from Bootstrap");
        });
        this.messageHandlers.putIfAbsent(RELOAD, (strArr2, outputStream2) -> {
            logger.info("Received RELOAD request from Bootstrap");
            echoRequestCmd(RELOAD, outputStream2);
            logger.info("Responded to RELOAD request from Bootstrap, stopping MiNiFi Server");
            this.minifiServer.stop(true);
        });
        this.messageHandlers.putIfAbsent(SHUTDOWN, (strArr3, outputStream3) -> {
            logger.info("Received SHUTDOWN request from Bootstrap");
            echoRequestCmd(SHUTDOWN, outputStream3);
            logger.info("Responded to SHUTDOWN request from Bootstrap, stopping MiNiFi Server");
            this.minifiServer.stop(false);
        });
        this.messageHandlers.putIfAbsent("DUMP", (strArr4, outputStream4) -> {
            logger.info("Received DUMP request from Bootstrap");
            writeDump(outputStream4);
        });
        this.messageHandlers.putIfAbsent("FLOW_STATUS_REPORT", (strArr5, outputStream5) -> {
            logger.info("Received FLOW_STATUS_REPORT request from Bootstrap");
            writeStatusReport(strArr5[0], outputStream5);
        });
        this.messageHandlers.putIfAbsent("ENV", (strArr6, outputStream6) -> {
            logger.info("Received ENV request from Bootstrap");
            writeEnv(outputStream6);
        });
    }

    private void writeStatusReport(String str, OutputStream outputStream) throws StatusRequestException {
        try {
            this.objectMapper.writeValue(outputStream, this.minifiServer.getStatusReport(str));
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private static void writeEnv(OutputStream outputStream) {
        try {
            BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(outputStream));
            try {
                StringBuilder sb = new StringBuilder();
                System.getProperties().forEach((obj, obj2) -> {
                    sb.append(obj).append("=").append(obj2).append("\n");
                });
                bufferedWriter.write(sb.toString());
                bufferedWriter.flush();
                bufferedWriter.close();
            } finally {
            }
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private void writeDump(OutputStream outputStream) {
        try {
            BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(outputStream));
            bufferedWriter.write(DumpUtil.getDump());
            bufferedWriter.flush();
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private void echoRequestCmd(String str, OutputStream outputStream) {
        try {
            outputStream.write((str + "\n").getBytes(StandardCharsets.UTF_8));
            outputStream.flush();
            outputStream.close();
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
}
