package io.harness.cf.client.connector;

import com.google.gson.Gson;
import io.harness.cf.client.common.SdkCodes;
import io.harness.cf.client.common.Utils;
import io.harness.cf.client.dto.Message;
import io.harness.cf.client.logger.LogUtil;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
import lombok.NonNull;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.EventListener;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.logging.HttpLoggingInterceptor;
import okio.BufferedSource;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/harness/cf/client/connector/EventSource.class */
public class EventSource implements Callback, AutoCloseable, Service {
    private static final Logger log = LoggerFactory.getLogger(EventSource.class);
    private final Updater updater;
    private final Gson gson;
    private final HttpLoggingInterceptor loggingInterceptor;
    private final long retryBackoffDelay;
    private OkHttpClient streamClient;
    private Call call;
    private final String url;
    private final Map<String, String> headers;
    private final long sseReadTimeoutMins;
    private final List<X509Certificate> trustedCAs;

    /* loaded from: input_file:io/harness/cf/client/connector/EventSource$SSEStreamException.class */
    private static class SSEStreamException extends RuntimeException {
        public SSEStreamException(String str, Throwable th) {
            super(str, th);
        }

        public SSEStreamException(String str) {
            super(str);
        }
    }

    public EventSource(@NonNull String str, Map<String, String> map, @NonNull Updater updater, long j) throws ConnectorException {
        this(str, map, updater, j, 2000, null);
        if (str == null) {
            throw new NullPointerException("url is marked non-null but is null");
        }
        if (updater == null) {
            throw new NullPointerException("updater is marked non-null but is null");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventSource(@NonNull String str, Map<String, String> map, @NonNull Updater updater, long j, int i, List<X509Certificate> list) {
        this.gson = new Gson();
        if (str == null) {
            throw new NullPointerException("url is marked non-null but is null");
        }
        if (updater == null) {
            throw new NullPointerException("updater is marked non-null but is null");
        }
        this.url = str;
        this.headers = map;
        this.updater = updater;
        this.sseReadTimeoutMins = j;
        this.retryBackoffDelay = i;
        this.trustedCAs = list;
        this.loggingInterceptor = new HttpLoggingInterceptor();
    }

    protected OkHttpClient makeStreamClient(long j, List<X509Certificate> list) throws ConnectorException {
        OkHttpClient.Builder retryOnConnectionFailure = new OkHttpClient.Builder().eventListener(EventListener.NONE).readTimeout(j, TimeUnit.MINUTES).retryOnConnectionFailure(true);
        setupTls(retryOnConnectionFailure, list);
        if (log.isDebugEnabled()) {
            retryOnConnectionFailure.addInterceptor(this.loggingInterceptor);
        } else {
            retryOnConnectionFailure.interceptors().remove(this.loggingInterceptor);
        }
        retryOnConnectionFailure.addInterceptor(new NewRetryInterceptor(this.retryBackoffDelay));
        return retryOnConnectionFailure.build();
    }

    private void setupTls(OkHttpClient.Builder builder, List<X509Certificate> list) throws ConnectorException {
        if (list != null) {
            try {
                if (!list.isEmpty()) {
                    KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
                    keyStore.load(null, null);
                    for (int i = 0; i < list.size(); i++) {
                        keyStore.setCertificateEntry("ca" + i, list.get(i));
                    }
                    TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
                    trustManagerFactory.init(keyStore);
                    TrustManager[] trustManagers = trustManagerFactory.getTrustManagers();
                    SSLContext sSLContext = SSLContext.getInstance("TLS");
                    sSLContext.init(null, trustManagers, new SecureRandom());
                    builder.sslSocketFactory(sSLContext.getSocketFactory(), (X509TrustManager) trustManagers[0]);
                }
            } catch (IOException | GeneralSecurityException e) {
                String str = "Failed to setup TLS on SSE endpoint: " + e.getMessage();
                log.warn(str, e);
                throw new ConnectorException(str, true, e);
            }
        }
    }

    @Override // io.harness.cf.client.connector.Service
    public void start() throws ConnectorException, InterruptedException {
        log.info("EventSource connecting with url {} and headers {}", this.url, Utils.redactHeaders(this.headers));
        this.streamClient = makeStreamClient(this.sseReadTimeoutMins, this.trustedCAs);
        Request.Builder addHeader = new Request.Builder().url(this.url).addHeader("User-Agent", "JavaSDK 1.7.0").addHeader("X-Request-ID", UUID.randomUUID().toString());
        Map<String, String> map = this.headers;
        Objects.requireNonNull(addHeader);
        map.forEach(addHeader::header);
        this.call = this.streamClient.newCall(addHeader.build());
        this.call.enqueue(this);
        this.updater.onReady();
    }

    @Override // io.harness.cf.client.connector.Service
    public void stop() {
        log.debug("Stopping EventSource service.");
        if (this.call != null) {
            this.call.cancel();
        }
    }

    @Override // java.lang.AutoCloseable, io.harness.cf.client.connector.Service
    public void close() {
        stop();
        if (this.streamClient != null) {
            this.streamClient.connectionPool().evictAll();
        }
        log.debug("EventSource closed");
    }

    public void onFailure(@NotNull Call call, @NotNull IOException iOException) {
        log.warn("SSE stream error", iOException);
        this.updater.onDisconnected(iOException.getMessage());
    }

    public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
        log.debug("SSE stream data: {}", response.message());
        try {
            if (!response.isSuccessful()) {
                throw new SSEStreamException("Invalid SSE HTTP response: " + response);
            }
            if (response.body() == null) {
                throw new SSEStreamException("Invalid SSE HTTP response: empty body");
            }
            this.updater.onConnected();
            BufferedSource source = response.body().source();
            while (true) {
                String readUtf8Line = source.readUtf8Line();
                if (readUtf8Line == null) {
                    log.warn("End of SSE stream");
                    this.updater.onDisconnected("End of SSE stream");
                    return;
                } else {
                    log.debug("SSE stream data: {}", readUtf8Line);
                    if (readUtf8Line.startsWith("data:")) {
                        Message message = (Message) this.gson.fromJson(readUtf8Line.substring(6), Message.class);
                        SdkCodes.infoStreamEventReceived(readUtf8Line.substring(6));
                        this.updater.update(message);
                    }
                }
            }
        } catch (Throwable th) {
            log.warn("SSE Stream aborted: " + getExceptionMsg(th));
            log.trace("SSE Stream aborted trace", th);
            this.updater.onDisconnected(getExceptionMsg(th));
        }
    }

    private String getExceptionMsg(Throwable th) {
        return (th.getMessage() == null || "null".equals(th.getMessage())) ? th.getClass().getSimpleName() : th.getMessage();
    }

    static {
        LogUtil.setSystemProps();
    }
}
