package co.cask.cdap.client.rest;

import co.cask.cdap.client.StreamClient;
import co.cask.cdap.client.StreamWriter;
import co.cask.cdap.security.authentication.client.AuthenticationClient;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.config.Registry;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/client/rest/RestStreamClient.class */
public class RestStreamClient implements StreamClient {
    private static final String DEFAULT_VERSION = "v3";
    private static final String DEFAULT_NAMESPACE = "default";
    private static final String TTL_ATTRIBUTE_NAME = "ttl";
    private static final int DEFAULT_WRITER_POOL_SIZE = 10;
    private final RestClientConnectionConfig config;
    private final int writerPoolSize;
    private final RestClient restClient;
    private Registry<ConnectionSocketFactory> connectionRegistry;
    private static final Logger LOG = LoggerFactory.getLogger(RestStreamClient.class);
    private static final Gson GSON = new Gson();

    /* loaded from: input_file:co/cask/cdap/client/rest/RestStreamClient$Builder.class */
    public static class Builder {
        private final int port;
        private final String host;
        private AuthenticationClient authClient;
        private String apiKey;
        private boolean ssl = false;
        private boolean verifySSLCert = true;
        private int writerPoolSize = RestStreamClient.DEFAULT_WRITER_POOL_SIZE;
        private String version = RestStreamClient.DEFAULT_VERSION;
        private String namespace = RestStreamClient.DEFAULT_NAMESPACE;

        public Builder(String str, int i) {
            this.host = str;
            this.port = i;
        }

        public Builder ssl(boolean z) {
            this.ssl = z;
            return this;
        }

        public Builder verifySSLCert(boolean z) {
            this.verifySSLCert = z;
            return this;
        }

        public Builder authClient(AuthenticationClient authenticationClient) {
            this.authClient = authenticationClient;
            return this;
        }

        public Builder apiKey(String str) {
            this.apiKey = str;
            return this;
        }

        public Builder writerPoolSize(int i) {
            this.writerPoolSize = i;
            return this;
        }

        public Builder version(String str) {
            this.version = str;
            return this;
        }

        public Builder namespace(String str) {
            this.namespace = str;
            return this;
        }

        public RestStreamClient build() {
            return new RestStreamClient(this);
        }
    }

    private RestStreamClient(Builder builder) {
        this.writerPoolSize = builder.writerPoolSize;
        this.config = new RestClientConnectionConfig(builder.host, builder.port, builder.authClient, builder.apiKey, builder.ssl, builder.version, builder.namespace);
        if (!builder.verifySSLCert) {
            try {
                this.connectionRegistry = RestUtil.getRegistryWithDisabledCertCheck();
            } catch (KeyManagementException e) {
                LOG.error("Failed to init SSL context: {}", e);
            } catch (NoSuchAlgorithmException e2) {
                LOG.error("Failed to get instance of SSL context: {}", e2);
            }
        }
        this.restClient = new RestClient(this.config, createConnectionManager());
    }

    @Override // co.cask.cdap.client.StreamClient
    public void create(String str) throws IOException {
        CloseableHttpResponse execute = this.restClient.execute(new HttpPut(this.restClient.resolve(String.format("/streams/%s", str))));
        try {
            LOG.debug("Create Stream Response Code : {}", Integer.valueOf(execute.getStatusLine().getStatusCode()));
            RestClient.responseCodeAnalysis(execute);
            execute.close();
        } catch (Throwable th) {
            execute.close();
            throw th;
        }
    }

    @Override // co.cask.cdap.client.StreamClient
    public void setTTL(String str, long j) throws IOException {
        HttpPut httpPut = new HttpPut(this.restClient.resolve("v2".equals(this.restClient.getVersion()) ? String.format("/streams/%s/config", str) : String.format("/streams/%s/properties", str)));
        StringEntity stringEntity = new StringEntity(GSON.toJson(ImmutableMap.of(TTL_ATTRIBUTE_NAME, Long.valueOf(j))));
        stringEntity.setContentType("application/json");
        httpPut.setEntity(stringEntity);
        CloseableHttpResponse execute = this.restClient.execute(httpPut);
        try {
            LOG.debug("Set TTL Response Code : {}", Integer.valueOf(execute.getStatusLine().getStatusCode()));
            RestClient.responseCodeAnalysis(execute);
            execute.close();
        } catch (Throwable th) {
            execute.close();
            throw th;
        }
    }

    @Override // co.cask.cdap.client.StreamClient
    public long getTTL(String str) throws IOException {
        CloseableHttpResponse execute = this.restClient.execute(new HttpGet(this.restClient.resolve("v2".equals(this.restClient.getVersion()) ? String.format("/streams/%s/info", str) : String.format("/streams/%s", str))));
        try {
            LOG.debug("Get TTL Response Code : {}", Integer.valueOf(execute.getStatusLine().getStatusCode()));
            RestClient.responseCodeAnalysis(execute);
            long asLong = RestClient.toJsonObject(execute.getEntity()).get(TTL_ATTRIBUTE_NAME).getAsLong();
            execute.close();
            return asLong;
        } catch (Throwable th) {
            execute.close();
            throw th;
        }
    }

    @Override // co.cask.cdap.client.StreamClient
    public void truncate(String str) throws IOException {
        CloseableHttpResponse execute = this.restClient.execute(new HttpPost(this.restClient.resolve(String.format("/streams/%s/truncate", str))));
        try {
            LOG.debug("Truncate stream Response Code : {}", Integer.valueOf(execute.getStatusLine().getStatusCode()));
            RestClient.responseCodeAnalysis(execute);
            execute.close();
        } catch (Throwable th) {
            execute.close();
            throw th;
        }
    }

    @Override // co.cask.cdap.client.StreamClient
    public StreamWriter createWriter(String str) throws IOException {
        LOG.debug("The Stream with id {} exists. Got the current Stream TTL value {} successfully.", str, Long.valueOf(getTTL(str)));
        PoolingHttpClientConnectionManager createConnectionManager = createConnectionManager();
        createConnectionManager.setMaxTotal(this.writerPoolSize);
        createConnectionManager.setDefaultMaxPerRoute(this.writerPoolSize);
        return new RestStreamWriter(new RestClient(this.config, createConnectionManager), this.writerPoolSize, str);
    }

    @Override // co.cask.cdap.client.StreamClient, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.restClient.close();
    }

    private PoolingHttpClientConnectionManager createConnectionManager() {
        return this.connectionRegistry != null ? new PoolingHttpClientConnectionManager(this.connectionRegistry) : new PoolingHttpClientConnectionManager();
    }

    public static Builder builder(String str, int i) {
        return new Builder(str, i);
    }
}
