package co.cask.cdap.client;

import co.cask.cdap.api.annotation.Beta;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.client.config.ClientConfig;
import co.cask.cdap.client.util.RESTClient;
import co.cask.cdap.common.BadRequestException;
import co.cask.cdap.common.StreamNotFoundException;
import co.cask.cdap.common.UnauthenticatedException;
import co.cask.cdap.common.conf.Constants;
import co.cask.cdap.common.stream.StreamEventTypeAdapter;
import co.cask.cdap.common.utils.TimeMathParser;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import co.cask.cdap.proto.StreamDetail;
import co.cask.cdap.proto.StreamProperties;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.StreamId;
import co.cask.cdap.security.authentication.client.AccessToken;
import co.cask.cdap.security.spi.authorization.UnauthorizedException;
import co.cask.common.ContentProvider;
import co.cask.common.http.HttpMethod;
import co.cask.common.http.HttpRequest;
import co.cask.common.http.HttpRequests;
import co.cask.common.http.HttpResponse;
import co.cask.common.http.ObjectResponse;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.CharStreams;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonToken;
import io.netty.handler.codec.rtsp.RtspHeaders;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.net.ssl.HttpsURLConnection;
import javax.servlet.http.HttpServletResponse;
import org.apache.tephra.Transaction;

@Beta
/* loaded from: input_file:co/cask/cdap/client/StreamClient.class */
public class StreamClient {
    private static final Gson GSON = StreamEventTypeAdapter.register(new GsonBuilder().registerTypeAdapter(Schema.class, new SchemaTypeAdapter())).create();
    private static final TypeToken<List<StreamDetail>> STREAM_DETAIL_LIST_TYPE = new TypeToken<List<StreamDetail>>() { // from class: co.cask.cdap.client.StreamClient.1
    };
    private final RESTClient restClient;
    private final ClientConfig config;

    @Inject
    public StreamClient(ClientConfig clientConfig, RESTClient rESTClient) {
        this.config = clientConfig;
        this.restClient = rESTClient;
    }

    public StreamClient(ClientConfig clientConfig) {
        this(clientConfig, new RESTClient(clientConfig));
    }

    public StreamProperties getConfig(StreamId streamId) throws IOException, StreamNotFoundException, UnauthenticatedException, UnauthorizedException {
        HttpResponse execute = this.restClient.execute(HttpMethod.GET, this.config.resolveNamespacedURLV3(streamId.getParent(), String.format("streams/%s", streamId.getStream())), this.config.getAccessToken(), HttpServletResponse.SC_NOT_FOUND);
        if (execute.getResponseCode() == 404) {
            throw new StreamNotFoundException(streamId);
        }
        return (StreamProperties) GSON.fromJson(execute.getResponseBodyAsString(Charsets.UTF_8), StreamProperties.class);
    }

    public void setStreamProperties(StreamId streamId, StreamProperties streamProperties) throws IOException, UnauthenticatedException, BadRequestException, StreamNotFoundException, UnauthorizedException {
        HttpResponse execute = this.restClient.execute(HttpRequest.put(this.config.resolveNamespacedURLV3(streamId.getParent(), String.format("streams/%s/properties", streamId.getStream()))).withBody(GSON.toJson(streamProperties)).build(), this.config.getAccessToken(), HttpServletResponse.SC_NOT_FOUND, HttpServletResponse.SC_BAD_REQUEST);
        if (execute.getResponseCode() == 400) {
            throw new BadRequestException("Bad request: " + execute.getResponseBodyAsString());
        }
        if (execute.getResponseCode() == 404) {
            throw new StreamNotFoundException(streamId);
        }
    }

    public void create(StreamId streamId) throws IOException, BadRequestException, UnauthenticatedException, UnauthorizedException {
        create(streamId, null);
    }

    public void create(StreamId streamId, @Nullable StreamProperties streamProperties) throws IOException, BadRequestException, UnauthenticatedException, UnauthorizedException {
        HttpRequest.Builder put = HttpRequest.put(this.config.resolveNamespacedURLV3(streamId.getParent(), String.format("streams/%s", streamId.getStream())));
        if (streamProperties != null) {
            put = put.withBody(GSON.toJson(streamProperties));
        }
        HttpResponse execute = this.restClient.execute(put.build(), this.config.getAccessToken(), HttpServletResponse.SC_BAD_REQUEST);
        if (execute.getResponseCode() == 400) {
            throw new BadRequestException("Bad request: " + execute.getResponseBodyAsString());
        }
    }

    public void setDescription(StreamId streamId, String str) throws IOException, StreamNotFoundException, UnauthenticatedException, UnauthorizedException {
        if (this.restClient.execute(HttpRequest.put(this.config.resolveNamespacedURLV3(streamId.getParent(), String.format("streams/%s/properties", streamId.getStream()))).withBody(GSON.toJson(ImmutableMap.of("description", str))).build(), this.config.getAccessToken(), HttpServletResponse.SC_NOT_FOUND).getResponseCode() == 404) {
            throw new StreamNotFoundException(streamId);
        }
    }

    public void sendEvent(StreamId streamId, String str) throws IOException, StreamNotFoundException, UnauthenticatedException, UnauthorizedException {
        writeEvent(this.config.resolveNamespacedURLV3(streamId.getParent(), String.format("streams/%s", streamId.getStream())), streamId, str);
    }

    public void asyncSendEvent(StreamId streamId, String str) throws IOException, StreamNotFoundException, UnauthenticatedException, UnauthorizedException {
        writeEvent(this.config.resolveNamespacedURLV3(streamId.getParent(), String.format("streams/%s/async", streamId.getStream())), streamId, str);
    }

    public void sendFile(StreamId streamId, String str, File file) throws IOException, StreamNotFoundException, UnauthenticatedException {
        Preconditions.checkNotNull(file);
        sendBatch(streamId, str, () -> {
            return new FileInputStream(file);
        });
    }

    public void sendBatch(StreamId streamId, String str, ContentProvider<? extends InputStream> contentProvider) throws IOException, StreamNotFoundException, UnauthenticatedException {
        if (this.restClient.upload(HttpRequest.post(this.config.resolveNamespacedURLV3(streamId.getParent(), String.format("streams/%s/batch", streamId.getStream()))).addHeaders(ImmutableMap.of("Content-type", str)).withBody(contentProvider).build(), this.config.getAccessToken(), HttpServletResponse.SC_NOT_FOUND).getResponseCode() == 404) {
            throw new StreamNotFoundException(streamId);
        }
    }

    public void truncate(StreamId streamId) throws IOException, StreamNotFoundException, UnauthenticatedException, UnauthorizedException {
        if (this.restClient.execute(HttpMethod.POST, this.config.resolveNamespacedURLV3(streamId.getParent(), String.format("streams/%s/truncate", streamId.getStream())), this.config.getAccessToken(), HttpServletResponse.SC_NOT_FOUND).getResponseCode() == 404) {
            throw new StreamNotFoundException(streamId);
        }
    }

    public void delete(StreamId streamId) throws IOException, StreamNotFoundException, UnauthenticatedException, UnauthorizedException {
        if (this.restClient.execute(HttpMethod.DELETE, this.config.resolveNamespacedURLV3(streamId.getParent(), String.format("streams/%s", streamId.getStream())), this.config.getAccessToken(), HttpServletResponse.SC_NOT_FOUND).getResponseCode() == 404) {
            throw new StreamNotFoundException(streamId);
        }
    }

    public void setTTL(StreamId streamId, long j) throws IOException, StreamNotFoundException, UnauthenticatedException, UnauthorizedException {
        if (this.restClient.execute(HttpRequest.put(this.config.resolveNamespacedURLV3(streamId.getParent(), String.format("streams/%s/properties", streamId.getStream()))).withBody(GSON.toJson(ImmutableMap.of(RtspHeaders.Values.TTL, Long.valueOf(j)))).build(), this.config.getAccessToken(), HttpServletResponse.SC_NOT_FOUND).getResponseCode() == 404) {
            throw new StreamNotFoundException(streamId);
        }
    }

    public List<StreamDetail> list(NamespaceId namespaceId) throws IOException, UnauthenticatedException, UnauthorizedException {
        return (List) ObjectResponse.fromJsonBody(this.restClient.execute(HttpMethod.GET, this.config.resolveNamespacedURLV3(namespaceId, Constants.Service.STREAMS), this.config.getAccessToken(), new int[0]), STREAM_DETAIL_LIST_TYPE).getResponseObject();
    }

    public <T extends Collection<? super StreamEvent>> T getEvents(StreamId streamId, String str, String str2, int i, final T t) throws IOException, StreamNotFoundException, UnauthenticatedException, UnauthorizedException {
        getEvents(streamId, str, str2, i, new Function<StreamEvent, Boolean>() { // from class: co.cask.cdap.client.StreamClient.2
            @Override // com.google.common.base.Function
            public Boolean apply(StreamEvent streamEvent) {
                t.add(streamEvent);
                return true;
            }
        });
        return t;
    }

    public <T extends Collection<? super StreamEvent>> T getEvents(StreamId streamId, long j, long j2, int i, T t) throws IOException, StreamNotFoundException, UnauthenticatedException, UnauthorizedException {
        return (T) getEvents(streamId, String.valueOf(j), String.valueOf(j2), i, (int) t);
    }

    public void getEvents(StreamId streamId, long j, long j2, int i, Function<? super StreamEvent, Boolean> function) throws IOException, StreamNotFoundException, UnauthenticatedException, UnauthorizedException {
        getEvents(streamId, String.valueOf(j), String.valueOf(j2), i, function);
    }

    public void getEvents(StreamId streamId, String str, String str2, int i, Function<? super StreamEvent, Boolean> function) throws IOException, StreamNotFoundException, UnauthenticatedException, UnauthorizedException {
        Boolean apply;
        HttpURLConnection httpURLConnection = (HttpURLConnection) this.config.resolveNamespacedURLV3(streamId.getParent(), String.format("streams/%s/events?start=%d&end=%d&limit=%d", streamId.getStream(), Long.valueOf(TimeMathParser.parseTime(str, TimeUnit.MILLISECONDS)), Long.valueOf(TimeMathParser.parseTime(str2, TimeUnit.MILLISECONDS)), Integer.valueOf(i))).openConnection();
        AccessToken accessToken = this.config.getAccessToken();
        if (accessToken != null) {
            httpURLConnection.setRequestProperty("Authorization", accessToken.getTokenType() + " " + accessToken.getValue());
        }
        if ((httpURLConnection instanceof HttpsURLConnection) && !this.config.isVerifySSLCert()) {
            try {
                HttpRequests.disableCertCheck((HttpsURLConnection) httpURLConnection);
            } catch (Exception e) {
            }
        }
        try {
            if (httpURLConnection.getResponseCode() == 401) {
                throw new UnauthenticatedException("Unauthorized status code received from the server.");
            }
            if (httpURLConnection.getResponseCode() == 404) {
                throw new StreamNotFoundException(streamId);
            }
            if (httpURLConnection.getResponseCode() == 204) {
                return;
            }
            if (httpURLConnection.getResponseCode() == 403) {
                throw new UnauthorizedException(CharStreams.toString(new InputStreamReader(httpURLConnection.getErrorStream(), Charsets.UTF_8)));
            }
            InputStream inputStream = httpURLConnection.getInputStream();
            JsonReader jsonReader = new JsonReader(new InputStreamReader(inputStream, Charsets.UTF_8));
            jsonReader.beginArray();
            while (jsonReader.peek() != JsonToken.END_ARRAY && (apply = function.apply(GSON.fromJson(jsonReader, StreamEvent.class))) != null && apply.booleanValue()) {
            }
            drain(inputStream);
            httpURLConnection.disconnect();
        } finally {
            httpURLConnection.disconnect();
        }
    }

    private void writeEvent(URL url, StreamId streamId, String str) throws IOException, StreamNotFoundException, UnauthenticatedException, UnauthorizedException {
        if (this.restClient.execute(HttpRequest.post(url).withBody(str).build(), this.config.getAccessToken(), HttpServletResponse.SC_NOT_FOUND).getResponseCode() == 404) {
            throw new StreamNotFoundException(streamId);
        }
    }

    private void drain(InputStream inputStream) throws IOException {
        do {
        } while (inputStream.skip(Transaction.NO_TX_IN_PROGRESS) > 0);
    }
}
