package co.cask.cdap.app.stream;

import co.cask.cdap.api.data.stream.StreamBatchWriter;
import co.cask.cdap.api.data.stream.StreamWriter;
import co.cask.cdap.api.stream.StreamEventData;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.http.DefaultHttpRequestConfig;
import co.cask.cdap.common.id.Id;
import co.cask.cdap.common.internal.remote.RemoteClient;
import co.cask.cdap.common.service.Retries;
import co.cask.cdap.common.service.RetryStrategy;
import co.cask.cdap.data2.metadata.lineage.AccessType;
import co.cask.cdap.data2.metadata.writer.LineageWriter;
import co.cask.cdap.data2.registry.UsageWriter;
import co.cask.cdap.proto.id.EntityId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.cdap.proto.id.StreamId;
import co.cask.cdap.security.spi.authentication.AuthenticationContext;
import co.cask.common.http.HttpMethod;
import co.cask.common.http.HttpRequest;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.twill.discovery.DiscoveryServiceClient;

/* loaded from: input_file:co/cask/cdap/app/stream/DefaultStreamWriter.class */
public class DefaultStreamWriter implements StreamWriter {
    private final ConcurrentMap<StreamId, Boolean> isStreamRegistered = Maps.newConcurrentMap();
    private final UsageWriter usageWriter;
    private final NamespaceId namespace;
    private final Iterable<? extends EntityId> owners;
    private final ProgramRunId run;
    private final LineageWriter lineageWriter;
    private final AuthenticationContext authenticationContext;
    private final boolean authorizationEnabled;
    private final RetryStrategy retryStrategy;
    private final RemoteClient remoteClient;

    @Inject
    public DefaultStreamWriter(@Assisted("run") Id.Run run, @Assisted("owners") Iterable<? extends EntityId> iterable, @Assisted("retryStrategy") RetryStrategy retryStrategy, UsageWriter usageWriter, LineageWriter lineageWriter, DiscoveryServiceClient discoveryServiceClient, AuthenticationContext authenticationContext, CConfiguration cConfiguration) {
        this.run = run.toEntityId();
        this.namespace = run.getNamespace().toEntityId();
        this.owners = iterable;
        this.lineageWriter = lineageWriter;
        this.usageWriter = usageWriter;
        this.authenticationContext = authenticationContext;
        this.authorizationEnabled = cConfiguration.getBoolean("security.authorization.enabled");
        this.retryStrategy = retryStrategy;
        this.remoteClient = new RemoteClient(discoveryServiceClient, "streams", new DefaultHttpRequestConfig(false), String.format("%s/namespaces/%s/streams/", "/v3", this.namespace.getNamespace()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void writeToStream(StreamId streamId, HttpRequest.Builder builder) throws IOException {
        if (this.authorizationEnabled) {
            builder.addHeader("CDAP-UserId", this.authenticationContext.getPrincipal().getName());
        }
        int responseCode = this.remoteClient.execute(builder.build()).getResponseCode();
        if (responseCode == HttpResponseStatus.NOT_FOUND.code()) {
            throw new IOException(String.format("Stream %s not found", streamId));
        }
        registerStream(streamId);
        if (responseCode < 200 || responseCode >= 300) {
            throw new IOException(String.format("Writing to Stream %s did not succeed. Stream Service ResponseCode : %d", streamId, Integer.valueOf(responseCode)));
        }
    }

    private void write(final String str, final ByteBuffer byteBuffer, final Map<String, String> map) throws IOException {
        Retries.callWithRetries(new Retries.Callable<Void, IOException>() { // from class: co.cask.cdap.app.stream.DefaultStreamWriter.1
            /* renamed from: call, reason: merged with bridge method [inline-methods] */
            public Void m24call() throws IOException {
                HttpRequest.Builder withBody = DefaultStreamWriter.this.remoteClient.requestBuilder(HttpMethod.POST, str).withBody(byteBuffer);
                for (Map.Entry entry : map.entrySet()) {
                    withBody.addHeader(str + "." + ((String) entry.getKey()), (String) entry.getValue());
                }
                DefaultStreamWriter.this.writeToStream(DefaultStreamWriter.this.namespace.stream(str), withBody);
                return null;
            }
        }, this.retryStrategy);
    }

    public void write(String str, String str2) throws IOException {
        write(str, str2, (Map<String, String>) ImmutableMap.of());
    }

    public void write(String str, String str2, Map<String, String> map) throws IOException {
        write(str, Charsets.UTF_8.encode(str2), map);
    }

    public void write(String str, ByteBuffer byteBuffer) throws IOException {
        write(str, byteBuffer, (Map<String, String>) ImmutableMap.of());
    }

    public void write(String str, StreamEventData streamEventData) throws IOException {
        write(str, (ByteBuffer) streamEventData.getBody(), streamEventData.getHeaders());
    }

    public void writeFile(final String str, final File file, final String str2) throws IOException {
        Retries.callWithRetries(new Retries.Callable<Void, IOException>() { // from class: co.cask.cdap.app.stream.DefaultStreamWriter.2
            /* renamed from: call, reason: merged with bridge method [inline-methods] */
            public Void m25call() throws IOException {
                DefaultStreamWriter.this.writeToStream(DefaultStreamWriter.this.namespace.stream(str), DefaultStreamWriter.this.remoteClient.requestBuilder(HttpMethod.POST, str + "/batch").withBody(file).addHeader("Content-Type", str2));
                return null;
            }
        }, this.retryStrategy);
    }

    public StreamBatchWriter createBatchWriter(final String str, String str2) throws IOException {
        HttpURLConnection httpURLConnection = (HttpURLConnection) ((URL) Retries.callWithRetries(new Retries.Callable<URL, IOException>() { // from class: co.cask.cdap.app.stream.DefaultStreamWriter.3
            /* renamed from: call, reason: merged with bridge method [inline-methods] */
            public URL m26call() throws IOException {
                return DefaultStreamWriter.this.remoteClient.resolve(str + "/batch");
            }
        }, this.retryStrategy)).openConnection();
        httpURLConnection.setRequestMethod(HttpMethod.POST.name());
        httpURLConnection.setReadTimeout(15000);
        httpURLConnection.setConnectTimeout(15000);
        httpURLConnection.setRequestProperty("Content-Type", str2);
        if (this.authorizationEnabled) {
            httpURLConnection.setRequestProperty("CDAP-UserId", this.authenticationContext.getPrincipal().getName());
        }
        httpURLConnection.setDoOutput(true);
        httpURLConnection.setChunkedStreamingMode(0);
        httpURLConnection.connect();
        try {
            StreamId stream = this.namespace.stream(str);
            registerStream(stream);
            return new DefaultStreamBatchWriter(httpURLConnection, stream);
        } catch (IOException e) {
            httpURLConnection.disconnect();
            throw e;
        }
    }

    private void registerStream(StreamId streamId) {
        if (!this.isStreamRegistered.containsKey(streamId)) {
            this.usageWriter.registerAll(this.owners, streamId);
            this.isStreamRegistered.put(streamId, true);
        }
        this.lineageWriter.addAccess(this.run, streamId, AccessType.WRITE);
    }
}
