package datahub.client.rest;

import com.linkedin.data.DataMap;
import com.linkedin.data.template.JacksonDataTemplateCodec;
import com.linkedin.metadata.Constants;
import com.linkedin.mxe.MetadataChangeProposal;
import datahub.client.Callback;
import datahub.client.Emitter;
import datahub.client.MetadataResponseFuture;
import datahub.client.MetadataWriteResponse;
import datahub.client.rest.RestEmitterConfig;
import datahub.event.EventFormatter;
import datahub.event.MetadataChangeProposalWrapper;
import datahub.event.UpsertAspectRequest;
import io.acryl.shaded.com.google.common.annotations.VisibleForTesting;
import io.acryl.shaded.http.client5.http.async.methods.SimpleHttpResponse;
import io.acryl.shaded.http.client5.http.async.methods.SimpleRequestBuilder;
import io.acryl.shaded.http.client5.http.config.RequestConfig;
import io.acryl.shaded.http.client5.http.config.TlsConfig;
import io.acryl.shaded.http.client5.http.impl.async.CloseableHttpAsyncClient;
import io.acryl.shaded.http.client5.http.impl.async.HttpAsyncClientBuilder;
import io.acryl.shaded.http.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import io.acryl.shaded.http.client5.http.ssl.ClientTlsStrategyBuilder;
import io.acryl.shaded.http.client5.http.ssl.NoopHostnameVerifier;
import io.acryl.shaded.http.client5.http.ssl.TrustAllStrategy;
import io.acryl.shaded.http.core5.concurrent.FutureCallback;
import io.acryl.shaded.http.core5.http.ContentType;
import io.acryl.shaded.http.core5.http2.HttpVersionPolicy;
import io.acryl.shaded.http.core5.ssl.SSLContexts;
import io.acryl.shaded.http.core5.util.TimeValue;
import io.acryl.shaded.jackson.annotation.JsonInclude;
import io.acryl.shaded.jackson.core.StreamReadConstraints;
import io.acryl.shaded.jackson.databind.ObjectMapper;
import io.acryl.shaded.javax.annotation.concurrent.ThreadSafe;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:datahub/client/rest/RestEmitter.class */
public class RestEmitter implements Emitter {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(RestEmitter.class);
    private final RestEmitterConfig config;
    private final String ingestProposalUrl;
    private final String ingestOpenApiUrl;
    private final String configUrl;
    private final ObjectMapper objectMapper = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
    private final JacksonDataTemplateCodec dataTemplateCodec;
    private final CloseableHttpAsyncClient httpClient;
    private final EventFormatter eventFormatter;

    public RestEmitter(RestEmitterConfig restEmitterConfig) {
        this.objectMapper.getFactory().setStreamReadConstraints(StreamReadConstraints.builder().maxStringLength(Integer.parseInt(System.getenv().getOrDefault(Constants.INGESTION_MAX_SERIALIZED_STRING_LENGTH, Constants.MAX_JACKSON_STRING_SIZE))).build());
        this.dataTemplateCodec = new JacksonDataTemplateCodec(this.objectMapper.getFactory());
        this.config = restEmitterConfig;
        HttpAsyncClientBuilder asyncHttpClientBuilder = this.config.getAsyncHttpClientBuilder();
        asyncHttpClientBuilder.setRetryStrategy(new DatahubHttpRequestRetryStrategy());
        if (restEmitterConfig.getTimeoutSec() != null || restEmitterConfig.isDisableChunkedEncoding()) {
            RequestConfig.Builder custom = RequestConfig.custom();
            if (restEmitterConfig.getTimeoutSec() != null) {
                custom.setConnectionRequestTimeout(restEmitterConfig.getTimeoutSec().intValue() * 1000, TimeUnit.MILLISECONDS).setResponseTimeout(restEmitterConfig.getTimeoutSec().intValue() * 1000, TimeUnit.MILLISECONDS);
            }
            if (restEmitterConfig.isDisableChunkedEncoding()) {
                custom.setContentCompressionEnabled(false);
            }
            asyncHttpClientBuilder.setDefaultRequestConfig(custom.build());
        }
        PoolingAsyncClientConnectionManagerBuilder create = PoolingAsyncClientConnectionManagerBuilder.create();
        create.setDefaultTlsConfig(TlsConfig.copy(TlsConfig.DEFAULT).setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_1).build());
        if (restEmitterConfig.isDisableSslVerification()) {
            try {
                create.setTlsStrategy(ClientTlsStrategyBuilder.create().setSslContext(SSLContexts.custom().loadTrustMaterial(TrustAllStrategy.INSTANCE).build()).setHostnameVerifier(NoopHostnameVerifier.INSTANCE).build());
            } catch (KeyManagementException | KeyStoreException | NoSuchAlgorithmException e) {
                throw new RuntimeException("Error while creating insecure http client", e);
            }
        }
        asyncHttpClientBuilder.setConnectionManager(create.build());
        asyncHttpClientBuilder.setRetryStrategy(new DatahubHttpRequestRetryStrategy(restEmitterConfig.getMaxRetries(), TimeValue.ofSeconds(restEmitterConfig.getRetryIntervalSec())));
        this.httpClient = asyncHttpClientBuilder.build();
        this.httpClient.start();
        this.ingestProposalUrl = this.config.getServer() + "/aspects?action=ingestProposal";
        this.ingestOpenApiUrl = restEmitterConfig.getServer() + "/openapi/entities/v1/";
        this.configUrl = this.config.getServer() + "/config";
        this.eventFormatter = this.config.getEventFormatter();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static MetadataWriteResponse mapResponse(SimpleHttpResponse simpleHttpResponse) {
        MetadataWriteResponse.MetadataWriteResponseBuilder underlyingResponse = MetadataWriteResponse.builder().underlyingResponse(simpleHttpResponse);
        if ((simpleHttpResponse == null || simpleHttpResponse.getCode() != 200) && ((SimpleHttpResponse) Objects.requireNonNull(simpleHttpResponse)).getCode() != 201) {
            underlyingResponse.success(false);
        } else {
            underlyingResponse.success(true);
        }
        try {
            new ByteArrayOutputStream();
            underlyingResponse.responseContent(simpleHttpResponse.getBody().getBodyText());
        } catch (Exception e) {
            log.warn("Wasn't able to convert response into a string", e);
        }
        return underlyingResponse.build();
    }

    public static RestEmitter create(Consumer<RestEmitterConfig.RestEmitterConfigBuilder> consumer) {
        return new RestEmitter(RestEmitterConfig.builder().with(consumer).build());
    }

    public static RestEmitter createWithDefaults() {
        return create(restEmitterConfigBuilder -> {
        });
    }

    @Override // datahub.client.Emitter
    public Future<MetadataWriteResponse> emit(MetadataChangeProposalWrapper metadataChangeProposalWrapper, Callback callback) throws IOException {
        return emit(this.eventFormatter.convert(metadataChangeProposalWrapper), callback);
    }

    @Override // datahub.client.Emitter
    public Future<MetadataWriteResponse> emit(MetadataChangeProposal metadataChangeProposal, Callback callback) throws IOException {
        DataMap dataMap = new DataMap();
        dataMap.put("proposal", metadataChangeProposal.data());
        String mapToString = this.dataTemplateCodec.mapToString(dataMap);
        log.debug("Emit: URL: {}, Payload: {}\n", this.ingestProposalUrl, mapToString);
        return postGeneric(this.ingestProposalUrl, mapToString, metadataChangeProposal, callback);
    }

    private Future<MetadataWriteResponse> postGeneric(String str, String str2, Object obj, final Callback callback) throws IOException {
        SimpleRequestBuilder post = SimpleRequestBuilder.post(str);
        post.setHeader("Content-Type", "application/json");
        post.setHeader("X-RestLi-Protocol-Version", "2.0.0");
        post.setHeader("Accept", "application/json");
        Map<String, String> extraHeaders = this.config.getExtraHeaders();
        Objects.requireNonNull(post);
        extraHeaders.forEach(post::setHeader);
        if (this.config.getToken() != null) {
            post.setHeader("Authorization", "Bearer " + this.config.getToken());
        }
        if (this.config.isDisableChunkedEncoding()) {
            post.setBody(str2.getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON);
        } else {
            post.setBody(str2, ContentType.APPLICATION_JSON);
        }
        final AtomicReference atomicReference = new AtomicReference();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        return new MetadataResponseFuture(this.httpClient.execute(post.build(), new FutureCallback<SimpleHttpResponse>() { // from class: datahub.client.rest.RestEmitter.1
            @Override // io.acryl.shaded.http.core5.concurrent.FutureCallback
            public void completed(SimpleHttpResponse simpleHttpResponse) {
                MetadataWriteResponse metadataWriteResponse = null;
                try {
                    metadataWriteResponse = RestEmitter.mapResponse(simpleHttpResponse);
                    atomicReference.set(metadataWriteResponse);
                } catch (Exception e) {
                }
                countDownLatch.countDown();
                if (callback != null) {
                    try {
                        callback.onCompletion(metadataWriteResponse);
                    } catch (Exception e2) {
                        RestEmitter.log.error("Error executing user callback on completion.", e2);
                    }
                }
            }

            @Override // io.acryl.shaded.http.core5.concurrent.FutureCallback
            public void failed(Exception exc) {
                countDownLatch.countDown();
                if (callback != null) {
                    try {
                        callback.onFailure(exc);
                    } catch (Exception e) {
                        RestEmitter.log.error("Error executing user callback on failure.", e);
                    }
                }
            }

            @Override // io.acryl.shaded.http.core5.concurrent.FutureCallback
            public void cancelled() {
                countDownLatch.countDown();
                if (callback != null) {
                    try {
                        callback.onFailure(new RuntimeException("Cancelled"));
                    } catch (Exception e) {
                        RestEmitter.log.error("Error executing user callback on failure due to cancellation.", e);
                    }
                }
            }
        }), atomicReference, countDownLatch);
    }

    private Future<MetadataWriteResponse> getGeneric(String str) throws IOException {
        return new MetadataResponseFuture(this.httpClient.execute(SimpleRequestBuilder.get(str).addHeader("Content-Type", "application/json").addHeader("X-RestLi-Protocol-Version", "2.0.0").addHeader("Accept", "application/json").build(), null), RestEmitter::mapResponse);
    }

    @Override // datahub.client.Emitter
    public boolean testConnection() throws IOException, ExecutionException, InterruptedException {
        return getGeneric(this.configUrl).get().isSuccess();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.httpClient.close();
    }

    @Override // datahub.client.Emitter
    public Future<MetadataWriteResponse> emit(List<UpsertAspectRequest> list, Callback callback) throws IOException {
        log.debug("Emit: URL: {}, Payload: {}\n", this.ingestOpenApiUrl, list);
        return postOpenAPI(list, callback);
    }

    private Future<MetadataWriteResponse> postOpenAPI(List<UpsertAspectRequest> list, final Callback callback) throws IOException {
        SimpleRequestBuilder addHeader = SimpleRequestBuilder.post(this.ingestOpenApiUrl).addHeader("Content-Type", "application/json").addHeader("Accept", "application/json").addHeader("X-RestLi-Protocol-Version", "2.0.0");
        Map<String, String> extraHeaders = this.config.getExtraHeaders();
        Objects.requireNonNull(addHeader);
        extraHeaders.forEach(addHeader::addHeader);
        if (this.config.getToken() != null) {
            addHeader.addHeader("Authorization", "Bearer " + this.config.getToken());
        }
        addHeader.setBody(this.objectMapper.writeValueAsString(list), ContentType.APPLICATION_JSON);
        final AtomicReference atomicReference = new AtomicReference();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        return new MetadataResponseFuture(this.httpClient.execute(addHeader.build(), new FutureCallback<SimpleHttpResponse>() { // from class: datahub.client.rest.RestEmitter.2
            @Override // io.acryl.shaded.http.core5.concurrent.FutureCallback
            public void completed(SimpleHttpResponse simpleHttpResponse) {
                MetadataWriteResponse metadataWriteResponse = null;
                try {
                    metadataWriteResponse = RestEmitter.mapResponse(simpleHttpResponse);
                    atomicReference.set(metadataWriteResponse);
                } catch (Exception e) {
                }
                countDownLatch.countDown();
                if (callback != null) {
                    try {
                        callback.onCompletion(metadataWriteResponse);
                    } catch (Exception e2) {
                        RestEmitter.log.error("Error executing user callback on completion.", e2);
                    }
                }
            }

            @Override // io.acryl.shaded.http.core5.concurrent.FutureCallback
            public void failed(Exception exc) {
                countDownLatch.countDown();
                if (callback != null) {
                    try {
                        callback.onFailure(exc);
                    } catch (Exception e) {
                        RestEmitter.log.error("Error executing user callback on failure.", e);
                    }
                }
            }

            @Override // io.acryl.shaded.http.core5.concurrent.FutureCallback
            public void cancelled() {
                countDownLatch.countDown();
                if (callback != null) {
                    try {
                        callback.onFailure(new RuntimeException("Cancelled"));
                    } catch (Exception e) {
                        RestEmitter.log.error("Error executing user callback on failure due to cancellation.", e);
                    }
                }
            }
        }), atomicReference, countDownLatch);
    }

    @VisibleForTesting
    CloseableHttpAsyncClient getHttpClient() {
        return this.httpClient;
    }
}
