/*
 * Decompiled with CFR 0.152.
 */
package io.castled.apps.connectors.intercom;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.castled.apps.connectors.intercom.IntercomAppConfig;
import io.castled.apps.connectors.intercom.IntercomErrorParser;
import io.castled.apps.connectors.intercom.IntercomObject;
import io.castled.apps.connectors.intercom.IntercomObjectSink;
import io.castled.apps.connectors.intercom.client.IntercomRestClient;
import io.castled.apps.connectors.intercom.client.dtos.DataAttribute;
import io.castled.apps.connectors.intercom.client.exceptions.IntercomRestException;
import io.castled.apps.connectors.intercom.client.models.IntercomModel;
import io.castled.apps.models.PrimaryKeyIdMapper;
import io.castled.apps.syncconfigs.AppSyncConfig;
import io.castled.commons.errors.CastledError;
import io.castled.commons.errors.errorclassifications.UnclassifiedError;
import io.castled.commons.models.DataSinkMessage;
import io.castled.commons.models.MessageSyncStats;
import io.castled.commons.models.ObjectIdAndMessage;
import io.castled.commons.streams.ErrorOutputStream;
import io.castled.core.CastledOffsetQueue;
import io.castled.schema.SchemaUtils;
import io.castled.schema.models.Field;
import io.castled.schema.models.MessageOffsetSupplier;
import io.castled.schema.models.Schema;
import io.castled.schema.models.Tuple;
import io.castled.utils.TimeUtils;
import java.time.ZonedDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IntercomContactSink
implements IntercomObjectSink<String> {
    private static final Logger log = LoggerFactory.getLogger(IntercomContactSink.class);
    private IntercomRestClient intercomRestClient;
    private List<String> customAttributes;
    private PrimaryKeyIdMapper<String> primaryKeyIdMapper;
    private IntercomObject intercomObject;
    private List<String> primaryKeys;
    private final AtomicLong processedRecords = new AtomicLong(0L);
    private ErrorOutputStream errorOutputStream;
    private final IntercomErrorParser intercomErrorParser = new IntercomErrorParser();
    private final CastledOffsetQueue<ObjectIdAndMessage> recordsBuffer = new CastledOffsetQueue((Consumer)new ContactConsumer(), 20, 60, true);

    @Override
    public IntercomObjectSink<String> initialize(IntercomObject intercomObject, AppSyncConfig appSyncConfig, IntercomAppConfig intercomAppConfig, ErrorOutputStream errorOutputStream, List<String> primaryKeys) {
        this.intercomRestClient = new IntercomRestClient(intercomAppConfig.getAccessToken());
        this.intercomObject = intercomObject;
        this.primaryKeys = primaryKeys;
        this.customAttributes = this.intercomRestClient.listAttributes(IntercomModel.CONTACT).stream().filter(DataAttribute::isCustom).map(DataAttribute::getName).collect(Collectors.toList());
        this.primaryKeyIdMapper = this.constructPrimaryKeyIdMapper(appSyncConfig);
        this.errorOutputStream = errorOutputStream;
        return this;
    }

    @Override
    public PrimaryKeyIdMapper<String> getPrimaryKeyIdMapper() {
        return this.primaryKeyIdMapper;
    }

    @Override
    public void createObject(DataSinkMessage message) {
        try {
            this.recordsBuffer.writePayload((MessageOffsetSupplier)new ObjectIdAndMessage(null, message), 5, TimeUnit.MINUTES);
        }
        catch (TimeoutException e) {
            log.error("Unable to publish record to records queue", (Throwable)e);
            this.errorOutputStream.writeFailedRecord(message, new UnclassifiedError("Internal error!! Unable to publish records to records queue. Please contact support"));
        }
    }

    @Override
    public void updateObject(String id, DataSinkMessage message) {
        try {
            this.recordsBuffer.writePayload((MessageOffsetSupplier)new ObjectIdAndMessage(id, message), 5, TimeUnit.MINUTES);
        }
        catch (TimeoutException e) {
            log.error("Unable to publish record to records queue", (Throwable)e);
            this.errorOutputStream.writeFailedRecord(message, new UnclassifiedError("Internal error!! Unable to publish records to records queue. Please contact support"));
        }
    }

    @Override
    public void flushRecords() throws TimeoutException {
        this.recordsBuffer.flush(TimeUtils.minutesToMillis((long)10L));
    }

    @Override
    public MessageSyncStats getSyncStats() {
        return new MessageSyncStats(this.processedRecords.get(), this.recordsBuffer.getProcessedOffset());
    }

    private Map<String, Object> constructContactProperties(Tuple record) {
        HashMap recordProperties = Maps.newHashMap();
        for (Field field : record.getFields()) {
            Object value = record.getValue(field.getName());
            if (value == null || field.getName().equals("company_id")) continue;
            if (SchemaUtils.isZonedTimestamp((Schema)field.getSchema())) {
                recordProperties.put(field.getName(), ((ZonedDateTime)value).toEpochSecond());
                continue;
            }
            recordProperties.put(field.getName(), value);
        }
        return recordProperties;
    }

    private PrimaryKeyIdMapper<String> constructPrimaryKeyIdMapper(AppSyncConfig appSyncConfig) {
        HashMap primaryKeyToIdMapping = Maps.newHashMap();
        this.intercomRestClient.consumeContacts(properties -> {
            String id = (String)properties.get("id");
            primaryKeyToIdMapping.put(this.primaryKeys.stream().map(properties::get).collect(Collectors.toList()), id);
        });
        return new PrimaryKeyIdMapper<String>(primaryKeyToIdMapping);
    }

    private class ContactConsumer
    implements Consumer<ObjectIdAndMessage> {
        private ContactConsumer() {
        }

        @Override
        public void accept(ObjectIdAndMessage objectIdAndMessage) {
            if (objectIdAndMessage.getId() == null) {
                this.createContact(objectIdAndMessage.getMessage());
            } else {
                this.updateContact(objectIdAndMessage.getMessage(), objectIdAndMessage.getId());
            }
        }

        private void attachCompanyIfRequired(DataSinkMessage message, String contactId) throws IntercomRestException {
            String companyId = Optional.ofNullable(message.getRecord().getField("company_id")).map(Field::getValue).map(Object::toString).orElse(null);
            if (companyId == null) {
                return;
            }
            String internalCompanyId = IntercomContactSink.this.intercomRestClient.getIntercomCompanyId(companyId);
            IntercomContactSink.this.intercomRestClient.attachCompany(contactId, internalCompanyId);
        }

        private void createContact(DataSinkMessage message) {
            Map contactProperties = IntercomContactSink.this.constructContactProperties(message.getRecord());
            if (Lists.newArrayList((Object[])new IntercomObject[]{IntercomObject.LEAD, IntercomObject.USER}).contains((Object)IntercomContactSink.this.intercomObject)) {
                contactProperties.put("role", IntercomContactSink.this.intercomObject.getName().toLowerCase());
            }
            try {
                String contactId = IntercomContactSink.this.intercomRestClient.createContact(contactProperties, IntercomContactSink.this.customAttributes);
                this.attachCompanyIfRequired(message, contactId);
            }
            catch (IntercomRestException e) {
                CastledError pipelineError = IntercomContactSink.this.intercomErrorParser.parseIntercomError(e.getErrorResponse());
                IntercomContactSink.this.errorOutputStream.writeFailedRecord(message, pipelineError);
            }
            IntercomContactSink.this.processedRecords.incrementAndGet();
        }

        private void updateContact(DataSinkMessage message, String id) {
            Map contactProperties = IntercomContactSink.this.constructContactProperties(message.getRecord());
            if (Lists.newArrayList((Object[])new IntercomObject[]{IntercomObject.LEAD, IntercomObject.USER}).contains((Object)IntercomContactSink.this.intercomObject)) {
                contactProperties.put("role", IntercomContactSink.this.intercomObject.getName().toLowerCase());
            }
            try {
                String contactId = IntercomContactSink.this.intercomRestClient.updateContact(id, contactProperties, IntercomContactSink.this.customAttributes);
                this.attachCompanyIfRequired(message, contactId);
            }
            catch (IntercomRestException e) {
                CastledError pipelineError = IntercomContactSink.this.intercomErrorParser.parseIntercomError(e.getErrorResponse());
                IntercomContactSink.this.errorOutputStream.writeFailedRecord(message, pipelineError);
            }
            IntercomContactSink.this.processedRecords.incrementAndGet();
        }
    }
}

