/*
 * Decompiled with CFR 0.152.
 */
package io.castled.apps.connectors.marketo;

import com.google.api.client.util.Lists;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import io.castled.apps.BufferedObjectSink;
import io.castled.apps.connectors.marketo.BatchSyncStats;
import io.castled.apps.connectors.marketo.MarketoAppConfig;
import io.castled.apps.connectors.marketo.MarketoAppSyncConfig;
import io.castled.apps.connectors.marketo.MarketoBulkClient;
import io.castled.apps.connectors.marketo.MarketoSyncError;
import io.castled.apps.connectors.marketo.MarketoSyncMode;
import io.castled.apps.connectors.marketo.MarketoUtils;
import io.castled.apps.connectors.marketo.dtos.BatchLeadUpdateRequest;
import io.castled.apps.models.DataSinkRequest;
import io.castled.commons.errors.errorclassifications.ExternallyCategorizedError;
import io.castled.commons.models.AppSyncMode;
import io.castled.commons.models.AppSyncStats;
import io.castled.commons.models.DataSinkMessage;
import io.castled.commons.streams.ErrorOutputStream;
import io.castled.exceptions.CastledRuntimeException;
import io.castled.functionalinterfaces.ThrowingConsumer;
import io.castled.schema.models.Tuple;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MarketoLeadSink
extends BufferedObjectSink<DataSinkMessage> {
    private static final Logger log = LoggerFactory.getLogger(MarketoLeadSink.class);
    private static final long BULK_REQUEST_BYTES_SIZE_MAX = 0xA00000L;
    private static final long BULK_REQUEST_NUM_MAX = 30000L;
    private static final long BATCH_REQUEST_NUM_MAX = 300L;
    private final MarketoBulkClient marketoBulkClient;
    private final ErrorOutputStream errorOutputStream;
    private final MarketoAppSyncConfig syncConfig;
    private final AppSyncStats syncStats;
    private final String pkDisplayName;
    private final List<String> mappedFields;

    public MarketoLeadSink(DataSinkRequest dataSinkRequest) {
        this.marketoBulkClient = new MarketoBulkClient((MarketoAppConfig)dataSinkRequest.getExternalApp().getConfig());
        this.errorOutputStream = dataSinkRequest.getErrorOutputStream();
        this.syncConfig = (MarketoAppSyncConfig)dataSinkRequest.getAppSyncConfig();
        this.syncStats = new AppSyncStats(0L, 0L, 0L);
        if (CollectionUtils.size(dataSinkRequest.getPrimaryKeys()) > 1) {
            log.error("Only 1 primary key allowed, we have more => " + dataSinkRequest.getPrimaryKeys());
        }
        this.pkDisplayName = (String)dataSinkRequest.getPrimaryKeys().stream().findFirst().get();
        this.mappedFields = dataSinkRequest.getMappedFields();
    }

    @Override
    protected void writeRecords(List<DataSinkMessage> msgs) {
        List<MarketoSyncError> upsertErrors;
        long skipped = 0L;
        if (this.syncConfig.getMode() == AppSyncMode.UPSERT) {
            ByteArrayOutputStream leadsStream = this.constructLeadFormData(msgs);
            String primaryKey = this.getPrimaryKeyName(this.pkDisplayName, ((DataSinkMessage)msgs.stream().findFirst().get()).getRecord());
            upsertErrors = this.marketoBulkClient.bulkUploadLeads(leadsStream, primaryKey, msgs.size());
        } else if (this.syncConfig.getMode() == AppSyncMode.UPDATE) {
            BatchLeadUpdateRequest request = this.constructLeadUpdateRequest(msgs);
            BatchSyncStats syncStats = this.marketoBulkClient.batchUpdateLeads(request);
            upsertErrors = syncStats.getErrors();
            skipped = syncStats.getSkipped();
        } else {
            throw new CastledRuntimeException(String.format("Invalid sync mode %s", new Object[]{this.syncConfig.getMode()}));
        }
        upsertErrors.forEach(errorRec -> this.errorOutputStream.writeFailedRecord((DataSinkMessage)msgs.get(errorRec.getMsgIdx()), new ExternallyCategorizedError(errorRec.getErrorCode(), errorRec.getMessage())));
        this.updateStats(msgs.size(), ((DataSinkMessage)Iterables.getLast(msgs)).getOffset(), skipped);
    }

    @Override
    public long getMaxBufferedObjects() {
        switch (this.syncConfig.getMode()) {
            case UPDATE: {
                return 300L;
            }
            case UPSERT: {
                return 30000L;
            }
        }
        throw new CastledRuntimeException(String.format("Invalid sync mode %s!", this.syncConfig.getMode().name()));
    }

    private ByteArrayOutputStream constructLeadFormData(List<DataSinkMessage> msgs) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        BufferedWriter baosWriter = new BufferedWriter(new OutputStreamWriter(baos));
        Tuple rec = ((DataSinkMessage)msgs.stream().findFirst().get()).getRecord();
        List restHeader = this.mappedFields.stream().map(fieldName -> (String)rec.getField(fieldName).getParams().get("name")).collect(Collectors.toList());
        List header = rec.getFields().stream().map(fieldRef -> fieldRef.getName()).collect(Collectors.toList());
        try {
            CSVPrinter csvPrinter = new CSVPrinter((Appendable)baosWriter, CSVFormat.DEFAULT.withHeader((String[])restHeader.stream().toArray(String[]::new)));
            ThrowingConsumer writeTuple = tuple -> csvPrinter.printRecord(header.stream().map(fieldName -> MarketoUtils.formatValue(tuple.getValue(fieldName), tuple.getField(fieldName).getSchema())).collect(Collectors.toList()).toArray());
            for (DataSinkMessage msgRef : msgs) {
                writeTuple.accept((Object)msgRef.getRecord());
            }
            csvPrinter.close();
        }
        catch (Exception e) {
            throw new CastledRuntimeException((Throwable)e);
        }
        return baos;
    }

    BatchLeadUpdateRequest constructLeadUpdateRequest(List<DataSinkMessage> records) {
        BatchLeadUpdateRequest request = new BatchLeadUpdateRequest();
        request.setAction(this.getMarketoSyncMode());
        request.setLookupField(this.getDedupeKey(records));
        ArrayList input = Lists.newArrayList();
        for (DataSinkMessage msg : records) {
            HashMap inputRec = Maps.newHashMap();
            msg.getRecord().getFields().forEach(fieldRef -> inputRec.put((String)fieldRef.getParams().get("name"), MarketoUtils.formatValue(fieldRef.getValue(), fieldRef.getSchema())));
            input.add(inputRec);
        }
        request.setInput(input);
        return request;
    }

    private String getDedupeKey(List<DataSinkMessage> records) {
        DataSinkMessage msg = (DataSinkMessage)records.stream().findFirst().orElseThrow(() -> new CastledRuntimeException("Records empty!"));
        return (String)msg.getRecord().getField(this.pkDisplayName).getParams().get("name");
    }

    private String getMarketoSyncMode() {
        switch (this.syncConfig.getMode()) {
            case UPDATE: {
                return MarketoSyncMode.UPDATE.getName();
            }
            case INSERT: {
                return MarketoSyncMode.INSERT.getName();
            }
            case UPSERT: {
                return MarketoSyncMode.UPSERT.getName();
            }
        }
        throw new CastledRuntimeException(String.format("Invalid sync mode %s!", this.syncConfig.getMode().name()));
    }

    private String getPrimaryKeyName(String displayName, Tuple tuple) {
        return this.mappedFields.stream().filter(fieldName -> displayName.equals(fieldName)).map(fieldName -> (String)tuple.getField(fieldName).getParams().get("name")).findFirst().get();
    }

    private void updateStats(long processed, long maxOffset, long skipped) {
        this.syncStats.setRecordsProcessed(this.syncStats.getRecordsProcessed() + processed);
        this.syncStats.setOffset(Math.max(this.syncStats.getOffset(), maxOffset));
        this.syncStats.setRecordsSkipped(this.syncStats.getRecordsSkipped() + skipped);
    }

    public AppSyncStats getSyncStats() {
        return this.syncStats;
    }
}

