/*
 * Decompiled with CFR 0.152.
 */
package io.castled.apps.connectors.sendgrid;

import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import io.castled.ObjectRegistry;
import io.castled.apps.BufferedObjectSink;
import io.castled.apps.connectors.sendgrid.SendgridAppConfig;
import io.castled.apps.connectors.sendgrid.SendgridAppSyncConfig;
import io.castled.apps.connectors.sendgrid.SendgridErrorParser;
import io.castled.apps.connectors.sendgrid.SendgridRequestFormatterUtils;
import io.castled.apps.connectors.sendgrid.SendgridRestClient;
import io.castled.apps.connectors.sendgrid.SendgridUpsertError;
import io.castled.commons.models.AppSyncStats;
import io.castled.commons.streams.ErrorOutputStream;
import io.castled.schema.models.Message;
import io.castled.schema.models.Tuple;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

public class SendgridContactSink
extends BufferedObjectSink<Message> {
    private static final long UPSERT_BATCH_NUM_SIZE_MAX = 30000L;
    private static final long UPSERT_BATCH_BYTES_SIZE_MAX = 0x600000L;
    private static final long UPSERT_BATCH_NUM_SIZE_MAX_CHOSEN = 30000L;
    private final SendgridRestClient sendgridRestClient;
    private final ErrorOutputStream errorOutputStream;
    private final SendgridAppSyncConfig syncConfig;
    private final SendgridErrorParser errorParser;
    private final AppSyncStats syncStats;

    public SendgridContactSink(SendgridAppConfig sendgridAppConfig, SendgridAppSyncConfig syncConfig, ErrorOutputStream errorOutputStream) {
        this.sendgridRestClient = new SendgridRestClient(sendgridAppConfig);
        this.errorOutputStream = errorOutputStream;
        this.errorParser = (SendgridErrorParser)ObjectRegistry.getInstance(SendgridErrorParser.class);
        this.syncConfig = syncConfig;
        this.syncStats = new AppSyncStats(0L, 0L, 0L);
    }

    @Override
    protected void writeRecords(List<Message> msgs) {
        List<Map<String, Object>> contacts = msgs.stream().map(msgRef -> this.constructContactProperties(msgRef.getRecord())).collect(Collectors.toList());
        List<SendgridUpsertError> upsertErrors = this.sendgridRestClient.upsertContacts(contacts, this.syncConfig.getListIds());
        Map emailRecordMap = msgs.stream().collect(Collectors.toMap(message -> this.getEmail(message.getRecord()), Function.identity()));
        upsertErrors.forEach(error -> this.errorOutputStream.writeFailedRecord((Message)emailRecordMap.get(error.getEmail()), this.errorParser.getPipelineError((SendgridUpsertError)error)));
        this.updateStats(msgs.size(), ((Message)Iterables.getLast(msgs)).getOffset());
    }

    @Override
    public long getMaxBufferedObjects() {
        return 30000L;
    }

    private String getEmail(Tuple record) {
        return (String)record.getValue("email");
    }

    private Map<String, Object> constructContactProperties(Tuple record) {
        String CUSTOM_TAG = "custom";
        HashMap reservedProperties = Maps.newHashMap();
        record.getFields().stream().filter(fieldRef -> (Boolean)fieldRef.getParams().get("custom") == false).forEach(fieldRef -> reservedProperties.put(fieldRef.getName(), SendgridRequestFormatterUtils.formatValue(fieldRef.getValue(), fieldRef.getSchema())));
        HashMap customProperties = Maps.newHashMap();
        record.getFields().stream().filter(fieldRef -> (Boolean)fieldRef.getParams().get("custom")).forEach(fieldRef -> customProperties.put(fieldRef.getName(), SendgridRequestFormatterUtils.formatValue(fieldRef.getValue(), fieldRef.getSchema())));
        reservedProperties.put("custom_fields", customProperties);
        return reservedProperties;
    }

    private void updateStats(long processed, long maxOffset) {
        this.syncStats.setRecordsProcessed(this.syncStats.getRecordsProcessed() + processed);
        this.syncStats.setOffset(Math.max(this.syncStats.getOffset(), maxOffset));
    }

    public AppSyncStats getSyncStats() {
        return this.syncStats;
    }
}

