package co.cask.cdap.client;

import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.client.config.ClientConfig;
import co.cask.cdap.client.util.RESTClient;
import co.cask.cdap.common.exception.BadRequestException;
import co.cask.cdap.common.exception.StreamNotFoundException;
import co.cask.cdap.common.exception.UnauthorizedException;
import co.cask.cdap.common.stream.StreamEventTypeAdapter;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.StreamDetail;
import co.cask.cdap.proto.StreamProperties;
import co.cask.cdap.security.authentication.client.AccessToken;
import co.cask.common.http.HttpMethod;
import co.cask.common.http.HttpRequest;
import co.cask.common.http.HttpRequests;
import co.cask.common.http.HttpResponse;
import co.cask.common.http.ObjectResponse;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import com.google.common.io.InputSupplier;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonToken;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Collection;
import java.util.List;
import javax.inject.Inject;
import javax.net.ssl.HttpsURLConnection;

/* loaded from: input_file:co/cask/cdap/client/StreamClient.class */
public class StreamClient {
    private static final Gson GSON = StreamEventTypeAdapter.register(new GsonBuilder().registerTypeAdapter(Schema.class, new SchemaTypeAdapter())).create();
    private final RESTClient restClient;
    private final ClientConfig config;

    @Inject
    public StreamClient(ClientConfig clientConfig, RESTClient rESTClient) {
        this.config = clientConfig;
        this.restClient = rESTClient;
    }

    public StreamClient(ClientConfig clientConfig) {
        this.config = clientConfig;
        this.restClient = new RESTClient(clientConfig);
    }

    public StreamProperties getConfig(String str) throws IOException, StreamNotFoundException, UnauthorizedException {
        Id.Stream from = Id.Stream.from(this.config.getNamespace(), str);
        HttpResponse execute = this.restClient.execute(HttpMethod.GET, this.config.resolveNamespacedURLV3(String.format("streams/%s", str)), this.config.getAccessToken(), 404);
        if (execute.getResponseCode() == 404) {
            throw new StreamNotFoundException(from);
        }
        return (StreamProperties) GSON.fromJson(execute.getResponseBodyAsString(Charsets.UTF_8), StreamProperties.class);
    }

    public void setStreamProperties(String str, StreamProperties streamProperties) throws IOException, UnauthorizedException, BadRequestException, StreamNotFoundException {
        Id.Stream from = Id.Stream.from(this.config.getNamespace(), str);
        HttpResponse execute = this.restClient.execute(HttpRequest.put(this.config.resolveNamespacedURLV3(String.format("streams/%s/properties", str))).withBody(GSON.toJson(streamProperties)).build(), this.config.getAccessToken(), 404, 400);
        if (execute.getResponseCode() == 400) {
            throw new BadRequestException("Bad request: " + execute.getResponseBodyAsString());
        }
        if (execute.getResponseCode() == 404) {
            throw new StreamNotFoundException(from);
        }
    }

    public void create(String str) throws IOException, BadRequestException, UnauthorizedException {
        HttpResponse execute = this.restClient.execute(HttpMethod.PUT, this.config.resolveNamespacedURLV3(String.format("streams/%s", str)), this.config.getAccessToken(), 400);
        if (execute.getResponseCode() == 400) {
            throw new BadRequestException("Bad request: " + execute.getResponseBodyAsString());
        }
    }

    public void sendEvent(String str, String str2) throws IOException, StreamNotFoundException, UnauthorizedException {
        writeEvent(this.config.resolveNamespacedURLV3(String.format("streams/%s", str)), str, str2);
    }

    public void asyncSendEvent(String str, String str2) throws IOException, StreamNotFoundException, UnauthorizedException {
        writeEvent(this.config.resolveNamespacedURLV3(String.format("streams/%s/async", str)), str, str2);
    }

    public void sendFile(String str, String str2, File file) throws IOException, StreamNotFoundException, UnauthorizedException {
        sendBatch(str, str2, Files.newInputStreamSupplier(file));
    }

    public void sendBatch(String str, String str2, InputSupplier<? extends InputStream> inputSupplier) throws IOException, StreamNotFoundException, UnauthorizedException {
        Id.Stream from = Id.Stream.from(this.config.getNamespace(), str);
        if (this.restClient.upload(HttpRequest.post(this.config.resolveNamespacedURLV3(String.format("streams/%s/batch", str))).addHeaders(ImmutableMap.of("Content-type", str2)).withBody(inputSupplier).build(), this.config.getAccessToken(), 404).getResponseCode() == 404) {
            throw new StreamNotFoundException(from);
        }
    }

    public void truncate(String str) throws IOException, StreamNotFoundException, UnauthorizedException {
        Id.Stream from = Id.Stream.from(this.config.getNamespace(), str);
        if (this.restClient.execute(HttpMethod.POST, this.config.resolveNamespacedURLV3(String.format("streams/%s/truncate", str)), this.config.getAccessToken(), 404).getResponseCode() == 404) {
            throw new StreamNotFoundException(from);
        }
    }

    public void setTTL(String str, long j) throws IOException, StreamNotFoundException, UnauthorizedException {
        Id.Stream from = Id.Stream.from(this.config.getNamespace(), str);
        if (this.restClient.execute(HttpRequest.put(this.config.resolveNamespacedURLV3(String.format("streams/%s/properties", str))).withBody(GSON.toJson(ImmutableMap.of("ttl", Long.valueOf(j)))).build(), this.config.getAccessToken(), 404).getResponseCode() == 404) {
            throw new StreamNotFoundException(from);
        }
    }

    public List<StreamDetail> list() throws IOException, UnauthorizedException {
        return (List) ObjectResponse.fromJsonBody(this.restClient.execute(HttpMethod.GET, this.config.resolveNamespacedURLV3("streams"), this.config.getAccessToken(), new int[0]), new TypeToken<List<StreamDetail>>() { // from class: co.cask.cdap.client.StreamClient.1
        }).getResponseObject();
    }

    public <T extends Collection<? super StreamEvent>> T getEvents(String str, long j, long j2, int i, final T t) throws IOException, StreamNotFoundException, UnauthorizedException {
        getEvents(str, j, j2, i, (Function<? super StreamEvent, Boolean>) new Function<StreamEvent, Boolean>() { // from class: co.cask.cdap.client.StreamClient.2
            public Boolean apply(StreamEvent streamEvent) {
                t.add(streamEvent);
                return true;
            }
        });
        return t;
    }

    public void getEvents(String str, long j, long j2, int i, Function<? super StreamEvent, Boolean> function) throws IOException, StreamNotFoundException, UnauthorizedException {
        Boolean bool;
        Id.Stream from = Id.Stream.from(this.config.getNamespace(), str);
        HttpURLConnection httpURLConnection = (HttpURLConnection) this.config.resolveNamespacedURLV3(String.format("streams/%s/events?start=%d&end=%d&limit=%d", str, Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(i))).openConnection();
        AccessToken accessToken = this.config.getAccessToken();
        if (accessToken != null) {
            httpURLConnection.setRequestProperty("Authorization", accessToken.getTokenType() + " " + accessToken.getValue());
        }
        if ((httpURLConnection instanceof HttpsURLConnection) && !this.config.isVerifySSLCert()) {
            try {
                HttpRequests.disableCertCheck((HttpsURLConnection) httpURLConnection);
            } catch (Exception e) {
            }
        }
        try {
            if (httpURLConnection.getResponseCode() == 401) {
                throw new UnauthorizedException("Unauthorized status code received from the server.");
            }
            if (httpURLConnection.getResponseCode() == 404) {
                throw new StreamNotFoundException(from);
            }
            if (httpURLConnection.getResponseCode() == 204) {
                return;
            }
            InputStream inputStream = httpURLConnection.getInputStream();
            JsonReader jsonReader = new JsonReader(new InputStreamReader(inputStream, Charsets.UTF_8));
            jsonReader.beginArray();
            while (jsonReader.peek() != JsonToken.END_ARRAY && (bool = (Boolean) function.apply(GSON.fromJson(jsonReader, StreamEvent.class))) != null && bool.booleanValue()) {
            }
            drain(inputStream);
            httpURLConnection.disconnect();
        } finally {
            httpURLConnection.disconnect();
        }
    }

    private void writeEvent(URL url, String str, String str2) throws IOException, StreamNotFoundException, UnauthorizedException {
        Id.Stream from = Id.Stream.from(this.config.getNamespace(), str);
        if (this.restClient.execute(HttpRequest.post(url).withBody(str2).build(), this.config.getAccessToken(), 404).getResponseCode() == 404) {
            throw new StreamNotFoundException(from);
        }
    }

    private void drain(InputStream inputStream) throws IOException {
        do {
        } while (inputStream.skip(Long.MAX_VALUE) > 0);
    }
}
