/*
 * Decompiled with CFR 0.152.
 */
package io.castled.apps.connectors.marketo;

import com.google.api.client.util.Lists;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import io.castled.apps.BufferedObjectSink;
import io.castled.apps.connectors.marketo.BatchSyncStats;
import io.castled.apps.connectors.marketo.MarketoAppConfig;
import io.castled.apps.connectors.marketo.MarketoAppSyncConfig;
import io.castled.apps.connectors.marketo.MarketoBulkClient;
import io.castled.apps.connectors.marketo.MarketoObject;
import io.castled.apps.connectors.marketo.MarketoSyncMode;
import io.castled.apps.connectors.marketo.MarketoUtils;
import io.castled.apps.connectors.marketo.dtos.GenericObjectSyncRequest;
import io.castled.apps.models.DataSinkRequest;
import io.castled.commons.errors.errorclassifications.ExternallyCategorizedError;
import io.castled.commons.models.AppSyncStats;
import io.castled.commons.models.DataSinkMessage;
import io.castled.commons.streams.ErrorOutputStream;
import io.castled.exceptions.CastledRuntimeException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

public class MarketoGenericObjectSink
extends BufferedObjectSink<DataSinkMessage> {
    private static final long BATCH_REQUEST_NUM_MAX = 300L;
    private final MarketoBulkClient marketoClient;
    private final ErrorOutputStream errorOutputStream;
    private final MarketoAppSyncConfig syncConfig;
    private final List<String> primaryKeys;
    private final AppSyncStats syncStats;

    MarketoGenericObjectSink(DataSinkRequest dataSinkRequest) {
        this.marketoClient = new MarketoBulkClient((MarketoAppConfig)dataSinkRequest.getExternalApp().getConfig());
        this.errorOutputStream = dataSinkRequest.getErrorOutputStream();
        this.syncConfig = (MarketoAppSyncConfig)dataSinkRequest.getAppSyncConfig();
        this.primaryKeys = dataSinkRequest.getPrimaryKeys();
        this.syncStats = new AppSyncStats(0L, 0L, 0L);
    }

    @Override
    protected void writeRecords(List<DataSinkMessage> records) {
        GenericObjectSyncRequest request = this.constructSyncRequest(records);
        MarketoObject marketoObject = MarketoObject.getObjectByName(this.syncConfig.getObject().getObjectName());
        BatchSyncStats batchSyncStats = this.marketoClient.batchSyncObject(marketoObject, request);
        batchSyncStats.getErrors().forEach(errorRec -> this.errorOutputStream.writeFailedRecord((DataSinkMessage)records.get(errorRec.getMsgIdx()), new ExternallyCategorizedError(errorRec.getErrorCode(), errorRec.getMessage())));
        this.updateSyncStats(records.size(), ((DataSinkMessage)Iterables.getLast(records)).getOffset(), batchSyncStats.getSkipped());
    }

    @Override
    public long getMaxBufferedObjects() {
        return 300L;
    }

    public AppSyncStats getSyncStats() {
        return this.syncStats;
    }

    GenericObjectSyncRequest constructSyncRequest(List<DataSinkMessage> records) {
        GenericObjectSyncRequest request = new GenericObjectSyncRequest();
        request.setAction(this.getMarketoSyncMode());
        request.setDedupeBy(this.getDedupeKey(records));
        ArrayList input = Lists.newArrayList();
        for (DataSinkMessage msg : records) {
            HashMap inputRec = Maps.newHashMap();
            msg.getRecord().getFields().forEach(fieldRef -> inputRec.put((String)fieldRef.getParams().get("name"), MarketoUtils.formatValue(fieldRef.getValue(), fieldRef.getSchema())));
            input.add(inputRec);
        }
        request.setInput(input);
        return request;
    }

    private String getDedupeKey(List<DataSinkMessage> records) {
        String pkDisplayName = this.primaryKeys.stream().findFirst().orElse(null);
        DataSinkMessage msg = (DataSinkMessage)records.stream().findFirst().orElseThrow(() -> new CastledRuntimeException("Records empty!"));
        return (String)msg.getRecord().getField(pkDisplayName).getParams().get("internalName");
    }

    private String getMarketoSyncMode() {
        switch (this.syncConfig.getMode()) {
            case UPDATE: {
                return MarketoSyncMode.UPDATE.getName();
            }
            case INSERT: {
                return MarketoSyncMode.INSERT.getName();
            }
            case UPSERT: {
                return MarketoSyncMode.UPSERT.getName();
            }
        }
        throw new CastledRuntimeException(String.format("Invalid sync mode %s!", this.syncConfig.getMode().name()));
    }

    private void updateSyncStats(long processed, long offset, long recordsSkipped) {
        this.syncStats.setRecordsProcessed(processed + this.syncStats.getRecordsProcessed());
        this.syncStats.setOffset(Math.max(offset, this.syncStats.getOffset()));
        this.syncStats.setRecordsSkipped(recordsSkipped + this.syncStats.getRecordsSkipped());
    }
}

