package datahub.client.file;

import com.linkedin.data.template.JacksonDataTemplateCodec;
import com.linkedin.metadata.Constants;
import com.linkedin.mxe.MetadataChangeProposal;
import datahub.client.Callback;
import datahub.client.Emitter;
import datahub.client.MetadataWriteResponse;
import datahub.event.EventFormatter;
import datahub.event.MetadataChangeProposalWrapper;
import datahub.event.UpsertAspectRequest;
import datahub.shaded.jackson.annotation.JsonInclude;
import datahub.shaded.jackson.core.StreamReadConstraints;
import datahub.shaded.jackson.core.util.DefaultIndenter;
import datahub.shaded.jackson.core.util.DefaultPrettyPrinter;
import datahub.shaded.jackson.databind.ObjectMapper;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import org.codehaus.plexus.util.SelectorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:datahub/client/file/FileEmitter.class */
public class FileEmitter implements Emitter {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(FileEmitter.class);
    private final EventFormatter eventFormatter;
    private final FileEmitterConfig config;
    private final ObjectMapper objectMapper = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
    private final JacksonDataTemplateCodec dataTemplateCodec;
    private final BufferedWriter writer;
    private final Future<MetadataWriteResponse> cachedSuccessFuture;
    private final AtomicBoolean closed;
    private boolean wroteSomething;
    private static final String INDENT_4 = "    ";

    public FileEmitter(final FileEmitterConfig fileEmitterConfig) {
        this.objectMapper.getFactory().setStreamReadConstraints(StreamReadConstraints.builder().maxStringLength(Integer.parseInt(System.getenv().getOrDefault(Constants.INGESTION_MAX_SERIALIZED_STRING_LENGTH, Constants.MAX_JACKSON_STRING_SIZE))).build());
        this.dataTemplateCodec = new JacksonDataTemplateCodec(this.objectMapper.getFactory());
        this.config = fileEmitterConfig;
        this.eventFormatter = this.config.getEventFormatter();
        this.dataTemplateCodec.setPrettyPrinter(new DefaultPrettyPrinter().withObjectIndenter(new DefaultIndenter(INDENT_4, DefaultIndenter.SYS_LF)).withArrayIndenter(new DefaultIndenter(INDENT_4, DefaultIndenter.SYS_LF)));
        try {
            this.writer = new BufferedWriter(new FileWriter(fileEmitterConfig.getFileName(), false));
            this.writer.append((CharSequence) SelectorUtils.PATTERN_HANDLER_PREFIX);
            this.writer.newLine();
            this.closed = new AtomicBoolean(false);
            this.wroteSomething = false;
            log.debug("Emitter created successfully for " + this.config.getFileName());
            this.cachedSuccessFuture = new Future<MetadataWriteResponse>() { // from class: datahub.client.file.FileEmitter.1
                @Override // java.util.concurrent.Future
                public boolean cancel(boolean z) {
                    return false;
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Future
                public MetadataWriteResponse get() throws InterruptedException, ExecutionException {
                    return MetadataWriteResponse.builder().success(true).responseContent("MCP witten to File " + fileEmitterConfig.getFileName()).build();
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Future
                public MetadataWriteResponse get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
                    return get();
                }

                @Override // java.util.concurrent.Future
                public boolean isCancelled() {
                    return false;
                }

                @Override // java.util.concurrent.Future
                public boolean isDone() {
                    return true;
                }
            };
        } catch (IOException e) {
            throw new RuntimeException("Error while creating file", e);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.writer.newLine();
        this.writer.append((CharSequence) SelectorUtils.PATTERN_HANDLER_SUFFIX);
        this.writer.close();
        this.closed.set(true);
        log.debug("Emitter closed for {}", this.config.getFileName());
    }

    @Override // datahub.client.Emitter
    public Future<MetadataWriteResponse> emit(MetadataChangeProposalWrapper metadataChangeProposalWrapper, Callback callback) throws IOException {
        return emit(this.eventFormatter.convert(metadataChangeProposalWrapper), callback);
    }

    @Override // datahub.client.Emitter
    public Future<MetadataWriteResponse> emit(MetadataChangeProposal metadataChangeProposal, Callback callback) throws IOException {
        if (this.closed.get()) {
            log.error("File Emitter is already closed.");
            Future<MetadataWriteResponse> createFailureFuture = createFailureFuture("File Emitter is already closed.");
            if (callback != null) {
                callback.onFailure(new Exception("File Emitter is already closed."));
            }
            return createFailureFuture;
        }
        try {
            String mapToString = this.dataTemplateCodec.mapToString(metadataChangeProposal.data());
            if (this.wroteSomething) {
                this.writer.append((CharSequence) ",");
                this.writer.newLine();
            }
            this.writer.append((CharSequence) mapToString);
            this.wroteSomething = true;
            log.debug("MCP written successfully: {}", mapToString);
            Future<MetadataWriteResponse> future = this.cachedSuccessFuture;
            if (callback != null) {
                try {
                    callback.onCompletion(future.get());
                } catch (InterruptedException | ExecutionException e) {
                    log.warn("Callback could not be executed.", e);
                }
            }
            return future;
        } catch (Throwable th) {
            Future<MetadataWriteResponse> createFailureFuture2 = createFailureFuture(th.getMessage());
            if (callback != null) {
                try {
                    callback.onFailure(th);
                } catch (Exception e2) {
                    log.warn("Callback could not be executed.", e2);
                }
            }
            return createFailureFuture2;
        }
    }

    @Override // datahub.client.Emitter
    public boolean testConnection() throws IOException, ExecutionException, InterruptedException {
        throw new UnsupportedOperationException("testConnection not relevant for File Emitter");
    }

    @Override // datahub.client.Emitter
    public Future<MetadataWriteResponse> emit(List<UpsertAspectRequest> list, Callback callback) throws IOException {
        throw new UnsupportedOperationException("UpsertAspectRequest not relevant for File Emitter");
    }

    private Future<MetadataWriteResponse> createFailureFuture(final String str) {
        return new Future<MetadataWriteResponse>() { // from class: datahub.client.file.FileEmitter.2
            @Override // java.util.concurrent.Future
            public boolean cancel(boolean z) {
                return false;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Future
            public MetadataWriteResponse get() throws InterruptedException, ExecutionException {
                return MetadataWriteResponse.builder().success(false).responseContent(str).build();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Future
            public MetadataWriteResponse get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
                return get();
            }

            @Override // java.util.concurrent.Future
            public boolean isCancelled() {
                return false;
            }

            @Override // java.util.concurrent.Future
            public boolean isDone() {
                return true;
            }
        };
    }
}
