package com.signalfx.signalflow;

import com.signalfx.connection.AbstractHttpReceiverConnection;
import com.signalfx.endpoint.SignalFxEndpoint;
import com.signalfx.shaded.apache.commons.io.IOUtils;
import com.signalfx.shaded.apache.http.HttpEntity;
import com.signalfx.shaded.apache.http.StatusLine;
import com.signalfx.shaded.apache.http.client.config.RequestConfig;
import com.signalfx.shaded.apache.http.client.methods.CloseableHttpResponse;
import com.signalfx.shaded.apache.http.client.methods.HttpPost;
import com.signalfx.shaded.apache.http.client.methods.HttpUriRequest;
import com.signalfx.shaded.apache.http.client.utils.URIBuilder;
import com.signalfx.shaded.apache.http.entity.StringEntity;
import com.signalfx.shaded.apache.http.impl.conn.BasicHttpClientConnectionManager;
import com.signalfx.shaded.apache.http.message.BasicNameValuePair;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/signalfx/signalflow/ServerSentEventsTransport.class */
public class ServerSentEventsTransport implements SignalFlowTransport {
    protected static final Logger log = LoggerFactory.getLogger((Class<?>) ServerSentEventsTransport.class);
    public static final Integer DEFAULT_TIMEOUT = 1000;
    protected final String token;
    protected final SignalFxEndpoint endpoint;
    protected final String path;
    protected Integer timeout;

    /* loaded from: input_file:com/signalfx/signalflow/ServerSentEventsTransport$TransportBuilder.class */
    public static class TransportBuilder {
        private String token;
        private String protocol = "https";
        private String host = SignalFlowTransport.DEFAULT_HOST;
        private int port = 443;
        private int timeout = 1;
        private int version = 2;

        public TransportBuilder(String str) {
            this.token = str;
        }

        public TransportBuilder setProtocol(String str) {
            this.protocol = str;
            return this;
        }

        public TransportBuilder setHost(String str) {
            this.host = str;
            return this;
        }

        public TransportBuilder setPort(int i) {
            this.port = i;
            return this;
        }

        public TransportBuilder setTimeout(int i) {
            this.timeout = i;
            return this;
        }

        public TransportBuilder setAPIVersion(int i) {
            this.version = i;
            return this;
        }

        public ServerSentEventsTransport build() {
            return new ServerSentEventsTransport(this.token, new SignalFxEndpoint(this.protocol, this.host, this.port), this.version, Integer.valueOf(this.timeout * 1000));
        }
    }

    /* loaded from: input_file:com/signalfx/signalflow/ServerSentEventsTransport$TransportChannel.class */
    public static class TransportChannel extends Channel {
        protected static final Logger log = LoggerFactory.getLogger((Class<?>) TransportChannel.class);
        private TransportConnection connection;
        private CloseableHttpResponse response;
        private HttpEntity responseHttpEntity;
        private TransportEventStreamParser streamParser;

        public TransportChannel(TransportConnection transportConnection, CloseableHttpResponse closeableHttpResponse) throws IOException {
            this.connection = transportConnection;
            this.response = closeableHttpResponse;
            this.responseHttpEntity = closeableHttpResponse.getEntity();
            this.streamParser = new TransportEventStreamParser(this.responseHttpEntity.getContent());
            this.iterator = this.streamParser;
            log.debug("constructed {} of type {}", this, getClass().getName());
        }

        @Override // com.signalfx.signalflow.Channel, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            super.close();
            try {
                this.response.close();
            } catch (IOException e) {
                log.error("failed to close response", (Throwable) e);
            }
            try {
                this.connection.close();
            } catch (IOException e2) {
                log.error("failed to close connection", (Throwable) e2);
            }
            this.streamParser.close();
        }
    }

    /* loaded from: input_file:com/signalfx/signalflow/ServerSentEventsTransport$TransportConnection.class */
    public static class TransportConnection extends AbstractHttpReceiverConnection {
        protected static final Logger log = LoggerFactory.getLogger((Class<?>) TransportConnection.class);
        public static final int DEFAULT_TIMEOUT_MS = 1000;
        protected final RequestConfig transportRequestConfig;

        public TransportConnection(SignalFxEndpoint signalFxEndpoint) {
            this(signalFxEndpoint, 1000);
        }

        public TransportConnection(SignalFxEndpoint signalFxEndpoint, int i) {
            super(signalFxEndpoint, i, new BasicHttpClientConnectionManager());
            this.transportRequestConfig = RequestConfig.custom().setSocketTimeout(0).setConnectionRequestTimeout(this.requestConfig.getConnectionRequestTimeout()).setConnectTimeout(this.requestConfig.getConnectTimeout()).setProxy(this.requestConfig.getProxy()).build();
            log.debug("constructed request config: {}", this.transportRequestConfig.toString());
        }

        public CloseableHttpResponse post(String str, String str2, Map<String, String> map, String str3) throws SignalFlowException {
            try {
                ArrayList arrayList = new ArrayList();
                if (map != null) {
                    for (Map.Entry<String, String> entry : map.entrySet()) {
                        arrayList.add(new BasicNameValuePair(entry.getKey(), entry.getValue()));
                    }
                }
                URIBuilder uRIBuilder = new URIBuilder(String.format("%s%s", this.host.toURI(), str2));
                uRIBuilder.addParameters(arrayList);
                HttpPost httpPost = new HttpPost(uRIBuilder.build());
                httpPost.setConfig(this.transportRequestConfig);
                httpPost.setHeader("X-SF-TOKEN", str);
                httpPost.setHeader("User-Agent", AbstractHttpReceiverConnection.USER_AGENT);
                httpPost.setHeader("Content-Type", "text/plain");
                if (str3 != null) {
                    httpPost.setEntity(new StringEntity(str3));
                }
                if (log.isDebugEnabled()) {
                    log.debug(httpPost.toString());
                }
                CloseableHttpResponse execute = this.client.execute((HttpUriRequest) httpPost);
                StatusLine statusLine = execute.getStatusLine();
                int statusCode = statusLine.getStatusCode();
                if (statusCode >= 200 && statusCode < 300) {
                    return execute;
                }
                try {
                    execute.close();
                } catch (IOException e) {
                    log.error("failed to close response", (Throwable) e);
                }
                throw new SignalFlowException(Integer.valueOf(statusLine.getStatusCode()), statusLine.getStatusCode() + ": failed post [ " + httpPost + " ] reason: " + statusLine.getReasonPhrase());
            } catch (IOException e2) {
                throw new SignalFlowException("failed communication. " + e2.getMessage(), e2);
            } catch (URISyntaxException e3) {
                throw new SignalFlowException("invalid uri. " + e3.getMessage(), e3);
            }
        }

        public void close() throws IOException {
            this.client.close();
        }
    }

    /* loaded from: input_file:com/signalfx/signalflow/ServerSentEventsTransport$TransportEventStreamParser.class */
    public static class TransportEventStreamParser implements Iterator<StreamMessage>, Closeable {
        private static final String EVENT = "event";
        private static final String ID = "id";
        private static final String DATA = "data";
        private static final String RETRY = "retry";
        private static final String DEFAULT_EVENT = "message";
        private static final String EMPTY_STRING = "";
        private BufferedReader eventStreamReader;
        private StreamMessage nextMessage;
        private String lastEventId;
        protected static final Logger log = LoggerFactory.getLogger((Class<?>) TransportEventStreamParser.class);
        private static final Pattern DIGITS_ONLY = Pattern.compile("^[\\d]+$");
        private boolean endOfStreamReached = false;
        private int reconnectionTimeoutMs = 1000;
        private String eventNameBuffer = "message";
        private StringBuilder dataBuffer = new StringBuilder();

        public TransportEventStreamParser(InputStream inputStream) throws UnsupportedEncodingException {
            this.eventStreamReader = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"));
        }

        public String getLastEventId() {
            return this.lastEventId;
        }

        public int getReconnectionTimeoutMs() {
            return this.reconnectionTimeoutMs;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            while (!this.endOfStreamReached && this.eventStreamReader != null && this.nextMessage == null) {
                parseNext();
            }
            return this.nextMessage != null;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public StreamMessage next() {
            while (!this.endOfStreamReached && this.eventStreamReader != null && this.nextMessage == null) {
                parseNext();
            }
            if (this.nextMessage == null) {
                throw new NoSuchElementException("no more stream messages");
            }
            StreamMessage streamMessage = this.nextMessage;
            this.nextMessage = null;
            return streamMessage;
        }

        @Override // java.util.Iterator
        public void remove() {
            throw new UnsupportedOperationException("remove from stream not supported");
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            if (this.eventStreamReader != null) {
                try {
                    this.eventStreamReader.close();
                    this.eventStreamReader = null;
                } catch (IOException e) {
                    log.error("failed to close event stream", (Throwable) e);
                }
            }
        }

        private void parseNext() {
            String readLine;
            if (this.eventStreamReader == null) {
                this.nextMessage = null;
                return;
            }
            try {
                long currentTimeMillis = System.currentTimeMillis();
                this.dataBuffer.setLength(0);
                while (true) {
                    readLine = this.eventStreamReader.readLine();
                    if (readLine == null || readLine.trim().isEmpty()) {
                        break;
                    }
                    if (!readLine.startsWith(":")) {
                        int indexOf = readLine.indexOf(":");
                        if (indexOf != -1) {
                            processField(readLine.substring(0, indexOf), readLine.substring(indexOf + 1).replaceFirst(" ", ""));
                        } else {
                            processField(readLine.trim(), "");
                        }
                    }
                }
                if (readLine == null) {
                    this.endOfStreamReached = true;
                    close();
                }
                if (this.dataBuffer.length() > 0) {
                    String sb = this.dataBuffer.toString();
                    if (sb.endsWith(IOUtils.LINE_SEPARATOR_UNIX)) {
                        sb = sb.substring(0, sb.length() - 1);
                    }
                    this.nextMessage = new StreamMessage(this.eventNameBuffer, this.lastEventId, sb);
                } else {
                    log.debug(this.eventNameBuffer.toString());
                    this.eventNameBuffer = "";
                    this.nextMessage = null;
                }
                log.debug("total stream message read/parse time (ms): {}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            } catch (IOException e) {
                log.error("failed to parse next stream event", (Throwable) e);
                throw new SignalFlowException("failed to parse next stream event", e);
            }
        }

        private void processField(String str, String str2) {
            if (DATA.equals(str)) {
                this.dataBuffer.append(str2).append(IOUtils.LINE_SEPARATOR_UNIX);
                return;
            }
            if (ID.equals(str)) {
                this.lastEventId = str2;
                return;
            }
            if ("event".equals(str)) {
                this.eventNameBuffer = str2;
            } else if (RETRY.equals(str) && DIGITS_ONLY.matcher(str2).matches()) {
                this.reconnectionTimeoutMs = Integer.parseInt(str2);
            }
        }
    }

    protected ServerSentEventsTransport(String str, SignalFxEndpoint signalFxEndpoint, int i, Integer num) {
        this.timeout = DEFAULT_TIMEOUT;
        this.token = str;
        this.endpoint = signalFxEndpoint;
        this.path = "/v" + i + "/signalflow";
        this.timeout = num;
    }

    @Override // com.signalfx.signalflow.SignalFlowTransport
    public Channel attach(String str, Map<String, String> map) {
        if (log.isDebugEnabled()) {
            log.debug("attach: [ {} ] with parameters: {}", str, map);
        }
        TransportConnection transportConnection = null;
        CloseableHttpResponse closeableHttpResponse = null;
        try {
            transportConnection = new TransportConnection(this.endpoint, this.timeout.intValue());
            closeableHttpResponse = transportConnection.post(this.token, this.path + "/" + str + "/attach", map, null);
            return new TransportChannel(transportConnection, closeableHttpResponse);
        } catch (Exception e) {
            close(closeableHttpResponse);
            close(transportConnection);
            throw new SignalFlowException("failed to create transport channel for attach", e);
        }
    }

    @Override // com.signalfx.signalflow.SignalFlowTransport
    public Channel execute(String str, Map<String, String> map) throws SignalFlowException {
        if (log.isDebugEnabled()) {
            log.debug("execute: [ {} ] with parameters: {}", str, map);
        }
        TransportConnection transportConnection = null;
        CloseableHttpResponse closeableHttpResponse = null;
        try {
            transportConnection = new TransportConnection(this.endpoint, this.timeout.intValue());
            closeableHttpResponse = transportConnection.post(this.token, this.path + "/execute", map, str);
            return new TransportChannel(transportConnection, closeableHttpResponse);
        } catch (IOException e) {
            close(closeableHttpResponse);
            close(transportConnection);
            throw new SignalFlowException("failed to create transport channel for execute", e);
        }
    }

    @Override // com.signalfx.signalflow.SignalFlowTransport
    public Channel preflight(String str, Map<String, String> map) throws SignalFlowException {
        if (log.isDebugEnabled()) {
            log.debug("preflight: [ {} ] with parameters: {}", str, map);
        }
        TransportConnection transportConnection = null;
        CloseableHttpResponse closeableHttpResponse = null;
        try {
            transportConnection = new TransportConnection(this.endpoint, this.timeout.intValue());
            closeableHttpResponse = transportConnection.post(this.token, this.path + "/preflight", map, str);
            return new TransportChannel(transportConnection, closeableHttpResponse);
        } catch (IOException e) {
            close(closeableHttpResponse);
            close(transportConnection);
            throw new SignalFlowException("failed to create transport channel for execute", e);
        }
    }

    @Override // com.signalfx.signalflow.SignalFlowTransport
    public void start(String str, Map<String, String> map) {
        if (log.isDebugEnabled()) {
            log.debug("start: [ {} ] with parameters: {}", str, map);
        }
        TransportConnection transportConnection = null;
        CloseableHttpResponse closeableHttpResponse = null;
        try {
            try {
                transportConnection = new TransportConnection(this.endpoint, this.timeout.intValue());
                closeableHttpResponse = transportConnection.post(this.token, this.path + "/start", map, str);
                close(closeableHttpResponse);
                close(transportConnection);
            } catch (Exception e) {
                throw new SignalFlowException("failed to start program - " + str, e);
            }
        } catch (Throwable th) {
            close(closeableHttpResponse);
            close(transportConnection);
            throw th;
        }
    }

    @Override // com.signalfx.signalflow.SignalFlowTransport
    public void stop(String str, Map<String, String> map) {
        if (log.isDebugEnabled()) {
            log.debug("stop: [ {} ] with parameters: {}", str, map);
        }
        TransportConnection transportConnection = null;
        CloseableHttpResponse closeableHttpResponse = null;
        try {
            try {
                transportConnection = new TransportConnection(this.endpoint, this.timeout.intValue());
                closeableHttpResponse = transportConnection.post(this.token, this.path + "/" + str + "/stop", map, null);
                close(closeableHttpResponse);
                close(transportConnection);
            } catch (Exception e) {
                throw new SignalFlowException("failed to stop program - " + str, e);
            }
        } catch (Throwable th) {
            close(closeableHttpResponse);
            close(transportConnection);
            throw th;
        }
    }

    @Override // com.signalfx.signalflow.SignalFlowTransport
    public void keepalive(String str) {
        if (log.isDebugEnabled()) {
            log.debug("keepalive: [ {} ]", str);
        }
        TransportConnection transportConnection = null;
        CloseableHttpResponse closeableHttpResponse = null;
        try {
            try {
                transportConnection = new TransportConnection(this.endpoint, this.timeout.intValue());
                closeableHttpResponse = transportConnection.post(this.token, this.path + "/" + str + "/keepalive", null, null);
                close(closeableHttpResponse);
                close(transportConnection);
            } catch (Exception e) {
                throw new SignalFlowException("failed to set keepalive for program - " + str, e);
            }
        } catch (Throwable th) {
            close(closeableHttpResponse);
            close(transportConnection);
            throw th;
        }
    }

    @Override // com.signalfx.signalflow.SignalFlowTransport
    public void close(int i, String str) {
    }

    private void close(CloseableHttpResponse closeableHttpResponse) {
        if (closeableHttpResponse != null) {
            try {
                closeableHttpResponse.close();
            } catch (IOException e) {
                log.error("error closing response", (Throwable) e);
            }
        }
    }

    private void close(TransportConnection transportConnection) {
        if (transportConnection != null) {
            try {
                transportConnection.close();
            } catch (IOException e) {
                log.error("error closing transport connection", (Throwable) e);
            }
        }
    }
}
