/*
 * Decompiled with CFR 0.152.
 */
package io.castled.apps.connectors.intercom;

import com.google.common.collect.Maps;
import io.castled.apps.connectors.intercom.IntercomAppConfig;
import io.castled.apps.connectors.intercom.IntercomErrorParser;
import io.castled.apps.connectors.intercom.IntercomObject;
import io.castled.apps.connectors.intercom.IntercomObjectSink;
import io.castled.apps.connectors.intercom.client.IntercomRestClient;
import io.castled.apps.connectors.intercom.client.dtos.DataAttribute;
import io.castled.apps.connectors.intercom.client.exceptions.IntercomRestException;
import io.castled.apps.connectors.intercom.client.models.IntercomModel;
import io.castled.apps.models.PrimaryKeyIdMapper;
import io.castled.apps.syncconfigs.AppSyncConfig;
import io.castled.commons.errors.CastledError;
import io.castled.commons.errors.errorclassifications.UnclassifiedError;
import io.castled.commons.models.MessageSyncStats;
import io.castled.commons.streams.ErrorOutputStream;
import io.castled.core.CastledOffsetQueue;
import io.castled.schema.SchemaUtils;
import io.castled.schema.models.Field;
import io.castled.schema.models.Message;
import io.castled.schema.models.MessageOffsetSupplier;
import io.castled.schema.models.Schema;
import io.castled.schema.models.Tuple;
import io.castled.utils.TimeUtils;
import java.time.ZonedDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IntercomCompanySink
implements IntercomObjectSink<String> {
    private static final Logger log = LoggerFactory.getLogger(IntercomCompanySink.class);
    private IntercomRestClient intercomRestClient;
    private List<String> customAttributes;
    private PrimaryKeyIdMapper<String> primaryKeyIdMapper;
    private ErrorOutputStream errorOutputStream;
    private final IntercomErrorParser errorParser = new IntercomErrorParser();
    private final AtomicLong failedRecords = new AtomicLong(0L);
    private final AtomicLong processedRecords = new AtomicLong(0L);
    private final CastledOffsetQueue<Message> companyRecordsBuffer = new CastledOffsetQueue((Consumer)new CompanyRecordConsumer(), 2, 10, true);

    @Override
    public IntercomObjectSink<String> initialize(IntercomObject intercomObject, AppSyncConfig appSyncConfig, IntercomAppConfig intercomAppConfig, ErrorOutputStream errorOutputStream, List<String> primaryKeys) {
        this.intercomRestClient = new IntercomRestClient(intercomAppConfig.getAccessToken());
        this.customAttributes = this.intercomRestClient.listAttributes(IntercomModel.COMPANY).stream().filter(DataAttribute::isCustom).map(DataAttribute::getName).collect(Collectors.toList());
        this.primaryKeyIdMapper = this.constructPrimaryKeyIdMapper(appSyncConfig);
        this.errorOutputStream = errorOutputStream;
        return this;
    }

    @Override
    public PrimaryKeyIdMapper<String> getPrimaryKeyIdMapper() {
        return this.primaryKeyIdMapper;
    }

    @Override
    public void createObject(Message message) {
        try {
            this.companyRecordsBuffer.writePayload((MessageOffsetSupplier)message, 5, TimeUnit.MINUTES);
        }
        catch (TimeoutException e) {
            log.error("Unable to publish record to records queue", (Throwable)e);
            this.errorOutputStream.writeFailedRecord(message, new UnclassifiedError("Internal error! Unable to publish records to records queue. Please contact support"));
        }
    }

    @Override
    public void updateObject(String id, Message message) {
    }

    @Override
    public void flushRecords() throws TimeoutException {
        this.companyRecordsBuffer.flush(TimeUtils.minutesToMillis((long)10L));
    }

    @Override
    public MessageSyncStats getSyncStats() {
        return new MessageSyncStats(this.processedRecords.get(), this.companyRecordsBuffer.getProcessedOffset());
    }

    private Map<String, Object> constructProperties(Tuple record) {
        HashMap recordProperties = Maps.newHashMap();
        for (Field field : record.getFields()) {
            Object value = record.getValue(field.getName());
            if (value == null) continue;
            if (SchemaUtils.isZonedTimestamp((Schema)field.getSchema())) {
                recordProperties.put(field.getName(), ((ZonedDateTime)value).toEpochSecond());
                continue;
            }
            recordProperties.put(field.getName(), value);
        }
        return recordProperties;
    }

    private PrimaryKeyIdMapper<String> constructPrimaryKeyIdMapper(AppSyncConfig appSyncConfig) {
        return new PrimaryKeyIdMapper<String>(Maps.newHashMap());
    }

    private class CompanyRecordConsumer
    implements Consumer<Message> {
        private CompanyRecordConsumer() {
        }

        @Override
        public void accept(Message message) {
            Map companyProperties = IntercomCompanySink.this.constructProperties(message.getRecord());
            try {
                IntercomCompanySink.this.intercomRestClient.createCompany(companyProperties, IntercomCompanySink.this.customAttributes);
            }
            catch (IntercomRestException e) {
                IntercomCompanySink.this.failedRecords.incrementAndGet();
                CastledError pipelineError = IntercomCompanySink.this.errorParser.parseIntercomError(e.getErrorResponse());
                IntercomCompanySink.this.errorOutputStream.writeFailedRecord(message, pipelineError);
            }
            IntercomCompanySink.this.processedRecords.incrementAndGet();
        }
    }
}

