package co.cask.cdap.app.stream;

import co.cask.cdap.api.data.stream.StreamBatchWriter;
import co.cask.cdap.api.data.stream.StreamWriter;
import co.cask.cdap.api.stream.StreamEventData;
import co.cask.cdap.common.discovery.EndpointStrategy;
import co.cask.cdap.common.discovery.RandomEndpointStrategy;
import co.cask.cdap.proto.Id;
import co.cask.common.http.HttpMethod;
import co.cask.common.http.HttpRequest;
import co.cask.common.http.HttpRequests;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.DiscoveryServiceClient;

/* loaded from: input_file:co/cask/cdap/app/stream/DefaultStreamWriter.class */
public class DefaultStreamWriter implements StreamWriter {
    private final String namespaceId;
    private final DiscoveryServiceClient discoveryServiceClient;
    private final EndpointStrategy endpointStrategy;

    public DefaultStreamWriter(String str, DiscoveryServiceClient discoveryServiceClient) {
        this.namespaceId = str;
        this.discoveryServiceClient = discoveryServiceClient;
        this.endpointStrategy = new RandomEndpointStrategy(discoveryServiceClient.discover("streams"));
    }

    private URL getStreamURL(String str) throws IOException {
        return getStreamURL(str, false);
    }

    private URL getStreamURL(String str, boolean z) throws IOException {
        Discoverable pick = this.endpointStrategy.pick(1L, TimeUnit.SECONDS);
        if (pick == null) {
            throw new IOException("Stream Service Endpoint not found");
        }
        InetSocketAddress socketAddress = pick.getSocketAddress();
        String format = String.format("http://%s:%d%s/namespaces/%s/streams/%s", socketAddress.getHostName(), Integer.valueOf(socketAddress.getPort()), "/v3", this.namespaceId, str);
        if (z) {
            format = String.format("%s/batch", format);
        }
        return new URL(format);
    }

    private void writeToStream(Id.Stream stream, HttpRequest httpRequest) throws IOException {
        int responseCode = HttpRequests.execute(httpRequest).getResponseCode();
        if (responseCode == HttpResponseStatus.NOT_FOUND.code()) {
            throw new IOException(String.format("Stream %s not found", stream));
        }
        if (responseCode < 200 || responseCode >= 300) {
            throw new IOException(String.format("Writing to Stream %s did not succeed. Stream Service ResponseCode : %d", stream, Integer.valueOf(responseCode)));
        }
    }

    private void write(String str, ByteBuffer byteBuffer, Map<String, String> map) throws IOException {
        writeToStream(Id.Stream.from(this.namespaceId, str), HttpRequest.post(getStreamURL(str)).withBody(byteBuffer).addHeaders(map).build());
    }

    public void write(String str, String str2) throws IOException {
        write(str, str2, (Map<String, String>) ImmutableMap.of());
    }

    public void write(String str, String str2, Map<String, String> map) throws IOException {
        write(str, Charsets.UTF_8.encode(str2), map);
    }

    public void write(String str, ByteBuffer byteBuffer) throws IOException {
        write(str, byteBuffer, (Map<String, String>) ImmutableMap.of());
    }

    public void write(String str, StreamEventData streamEventData) throws IOException {
        write(str, (ByteBuffer) streamEventData.getBody(), streamEventData.getHeaders());
    }

    public void writeFile(String str, File file, String str2) throws IOException {
        writeToStream(Id.Stream.from(this.namespaceId, str), HttpRequest.post(getStreamURL(str, true)).withBody(file).addHeader("Content-Type", str2).build());
    }

    public StreamBatchWriter createBatchWriter(String str, String str2) throws IOException {
        HttpURLConnection httpURLConnection = (HttpURLConnection) getStreamURL(str, true).openConnection();
        httpURLConnection.setRequestMethod(HttpMethod.POST.name());
        httpURLConnection.setReadTimeout(15000);
        httpURLConnection.setConnectTimeout(15000);
        httpURLConnection.setRequestProperty("Content-Type", str2);
        httpURLConnection.setDoOutput(true);
        httpURLConnection.setChunkedStreamingMode(0);
        httpURLConnection.connect();
        try {
            return new DefaultStreamBatchWriter(httpURLConnection, Id.Stream.from(this.namespaceId, str));
        } catch (IOException e) {
            httpURLConnection.disconnect();
            throw e;
        }
    }
}
