/*
 * Decompiled with CFR 0.152.
 */
package io.castled.apps.connectors.activecampaign;

import com.google.common.collect.Iterables;
import com.google.inject.Singleton;
import io.castled.ObjectRegistry;
import io.castled.apps.BufferedObjectSink;
import io.castled.apps.connectors.activecampaign.ActiveCampaignAppConfig;
import io.castled.apps.connectors.activecampaign.ActiveCampaignErrorParser;
import io.castled.apps.connectors.activecampaign.ActiveCampaignObjectFields;
import io.castled.apps.connectors.activecampaign.client.ActiveCampaignRestClient;
import io.castled.apps.connectors.activecampaign.constant.ActiveCampaignConstants;
import io.castled.apps.connectors.activecampaign.dto.Contact;
import io.castled.apps.connectors.activecampaign.dto.FieldValue;
import io.castled.apps.connectors.activecampaign.models.ContactAndError;
import io.castled.apps.models.GenericSyncObject;
import io.castled.commons.models.DataSinkMessage;
import io.castled.commons.models.MessageSyncStats;
import io.castled.commons.streams.ErrorOutputStream;
import io.castled.schema.models.Field;
import io.castled.schema.models.Tuple;
import io.castled.utils.StringUtils;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

@Singleton
public class ActiveCampaignAudienceSink
extends BufferedObjectSink<DataSinkMessage> {
    private final ActiveCampaignRestClient activeCampaignRestClient;
    private final ActiveCampaignErrorParser activeCampaignErrorParser;
    private final ErrorOutputStream errorOutputStream;
    private final GenericSyncObject audienceSyncObject;
    private final AtomicLong processedRecords = new AtomicLong(0L);
    private long lastProcessedOffset = 0L;

    public ActiveCampaignAudienceSink(ActiveCampaignAppConfig activeCampaignAppConfig, ErrorOutputStream errorOutputStream, GenericSyncObject audienceSyncObject) {
        this.activeCampaignRestClient = new ActiveCampaignRestClient(activeCampaignAppConfig.getApiURL(), activeCampaignAppConfig.getApiKey());
        this.errorOutputStream = errorOutputStream;
        this.activeCampaignErrorParser = (ActiveCampaignErrorParser)ObjectRegistry.getInstance(ActiveCampaignErrorParser.class);
        this.audienceSyncObject = audienceSyncObject;
    }

    @Override
    protected void writeRecords(List<DataSinkMessage> messages) {
        List<DataSinkMessage> messagesWithMissingEmails = messages.stream().filter(message -> StringUtils.nullIfEmpty((String)this.getEmail(message.getRecord())) == null).collect(Collectors.toList());
        messagesWithMissingEmails.forEach(message -> this.errorOutputStream.writeFailedRecord((DataSinkMessage)message, this.activeCampaignErrorParser.getMissingRequiredFieldError("email")));
        messages.removeAll(messagesWithMissingEmails);
        List<ContactAndError> failedRecords = this.activeCampaignRestClient.upsertContacts(messages.stream().map(DataSinkMessage::getRecord).map(this::constructContact).collect(Collectors.toList()));
        Map emailRecordMapper = messages.stream().filter(message -> this.getEmail(message.getRecord()) != null).collect(Collectors.toMap(message -> this.getEmail(message.getRecord()), Function.identity()));
        failedRecords.forEach(failedRecord -> failedRecord.getFailureReasons().forEach(failureReason -> this.errorOutputStream.writeFailedRecord((DataSinkMessage)emailRecordMapper.get(failedRecord.getContact().getEmail()), this.activeCampaignErrorParser.getPipelineError((String)failureReason))));
        this.processedRecords.addAndGet(messages.size());
        this.lastProcessedOffset = Math.max(this.lastProcessedOffset, ((DataSinkMessage)Iterables.getLast(messages)).getOffset());
    }

    private String getEmail(Tuple record) {
        return (String)record.getValue(ActiveCampaignObjectFields.CONTACTS_FIELDS.EMAIL.getFieldName());
    }

    private Contact constructContact(Tuple record) {
        String email = this.getEmail(record);
        String firstName = (String)record.getValue(ActiveCampaignObjectFields.CONTACTS_FIELDS.FIRST_NAME.getFieldName());
        String lastName = (String)record.getValue(ActiveCampaignObjectFields.CONTACTS_FIELDS.LAST_NAME.getFieldName());
        String phoneNumber = (String)record.getValue(ActiveCampaignObjectFields.CONTACTS_FIELDS.PHONE_NUMBER.getFieldName());
        List<FieldValue> fieldValues = record.getFields().stream().filter(fieldRef -> (Boolean)fieldRef.getParams().get(ActiveCampaignConstants.CUSTOM_FIELD_INDICATOR)).map(field -> FieldValue.builder().id(this.transformFieldId((Field)field)).value(this.transformFieldValue((Field)field)).build()).collect(Collectors.toList());
        return Contact.builder().email(email).first_name(firstName).last_name(lastName).phone(phoneNumber).fields(fieldValues).build();
    }

    private Integer transformFieldId(Field field) {
        return Optional.ofNullable(field.getParams().get(ActiveCampaignConstants.CUSTOM_FIELD_ID)).filter(objectRef -> objectRef instanceof Integer).orElse(null);
    }

    private String transformFieldValue(Field field) {
        Object object = field.getValue();
        if (object instanceof Integer) {
            return String.valueOf(object);
        }
        if (object instanceof String) {
            return (String)object;
        }
        if (object instanceof LocalDate) {
            return ((LocalDate)object).format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
        }
        if (object instanceof LocalDateTime) {
            return ((LocalDateTime)object).format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ"));
        }
        return null;
    }

    public MessageSyncStats getSyncStats() {
        return new MessageSyncStats(this.processedRecords.get(), this.lastProcessedOffset);
    }

    @Override
    public long getMaxBufferedObjects() {
        return 250L;
    }
}

