/*
 * Decompiled with CFR 0.152.
 */
package io.castled.apps.connectors.hubspot.objectsinks;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.castled.ObjectRegistry;
import io.castled.apps.BufferedObjectSink;
import io.castled.apps.OAuthAppConfig;
import io.castled.apps.connectors.hubspot.HubspotErrorParser;
import io.castled.apps.connectors.hubspot.HubspotSyncObject;
import io.castled.apps.connectors.hubspot.client.HubspotRestClient;
import io.castled.apps.connectors.hubspot.client.dtos.BatchUpdateRequest;
import io.castled.apps.connectors.hubspot.client.dtos.ObjectUpdateRequest;
import io.castled.apps.connectors.hubspot.client.exception.BatchObjectException;
import io.castled.apps.connectors.hubspot.schemaMappers.HubspotApiSchemaMapper;
import io.castled.commons.errors.CastledError;
import io.castled.commons.errors.errorclassifications.UnclassifiedError;
import io.castled.commons.models.MessageSyncStats;
import io.castled.commons.models.ObjectIdAndMessage;
import io.castled.commons.streams.ErrorOutputStream;
import io.castled.core.CastledOffsetListQueue;
import io.castled.exceptions.CastledRuntimeException;
import io.castled.schema.IncompatibleValueException;
import io.castled.schema.SchemaMapper;
import io.castled.schema.models.Field;
import io.castled.schema.models.Tuple;
import io.castled.utils.TimeUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HubspotObjectSink
extends BufferedObjectSink<ObjectIdAndMessage> {
    private static final Logger log = LoggerFactory.getLogger(HubspotObjectSink.class);
    private final CastledOffsetListQueue<ObjectIdAndMessage> requestsBuffer = new CastledOffsetListQueue((Consumer)new ObjectUpdateConsumer(), 5, 500, true);
    private final AtomicLong processedRecords = new AtomicLong(0L);
    private final HubspotRestClient hubspotRestClient;
    private final ErrorOutputStream errorOutputStream;
    private final HubspotSyncObject hubspotObject;

    public HubspotObjectSink(OAuthAppConfig oAuthAppConfig, ErrorOutputStream errorOutputStream, HubspotSyncObject hubspotObject) {
        this.hubspotObject = hubspotObject;
        this.hubspotRestClient = new HubspotRestClient(oAuthAppConfig.getOAuthToken(), oAuthAppConfig.getClientConfig());
        this.errorOutputStream = errorOutputStream;
    }

    protected Map<String, Object> createObjectProperties(Tuple bufferedRecord) {
        SchemaMapper schemaMapper = (SchemaMapper)ObjectRegistry.getInstance(HubspotApiSchemaMapper.class);
        try {
            HashMap properties = Maps.newHashMap();
            for (Field field : bufferedRecord.getFields()) {
                if (bufferedRecord.getValue(field.getName()) != null) {
                    properties.put(field.getName(), schemaMapper.transformValue(bufferedRecord.getValue(field.getName()), field.getSchema()));
                    continue;
                }
                properties.put(field.getName(), "");
            }
            return properties;
        }
        catch (IncompatibleValueException e) {
            throw new CastledRuntimeException((Throwable)e);
        }
    }

    @Override
    protected void writeRecords(List<ObjectIdAndMessage> records) {
        try {
            this.requestsBuffer.writePayload((List)Lists.newArrayList(records), 5, TimeUnit.MINUTES);
        }
        catch (TimeoutException e) {
            log.error("Unable to publish records to records queue", (Throwable)e);
            for (ObjectIdAndMessage record : records) {
                this.errorOutputStream.writeFailedRecord(record.getMessage(), new UnclassifiedError("Internal error!! Unable to publish records to records queue. Please contact support"));
            }
        }
    }

    public MessageSyncStats getSyncStats() {
        return new MessageSyncStats(this.processedRecords.get(), this.requestsBuffer.getProcessedOffset());
    }

    @Override
    public void flushRecords() throws Exception {
        super.flushRecords();
        this.requestsBuffer.flush(TimeUtils.minutesToMillis((long)10L));
    }

    @Override
    public long getMaxBufferedObjects() {
        return 10L;
    }

    private class ObjectUpdateConsumer
    implements Consumer<List<ObjectIdAndMessage>> {
        private ObjectUpdateConsumer() {
        }

        @Override
        public void accept(List<ObjectIdAndMessage> records) {
            this.updateRecords(records.stream().filter(record -> record.getId() == null).collect(Collectors.toList()), true);
            this.updateRecords(records.stream().filter(record -> record.getId() != null).collect(Collectors.toList()), false);
            HubspotObjectSink.this.processedRecords.addAndGet(records.size());
        }

        private void updateRecords(List<ObjectIdAndMessage> records, boolean create) {
            if (CollectionUtils.isEmpty(records)) {
                return;
            }
            List<ObjectUpdateRequest> objectUpdateRequests = records.stream().map(recordRef -> new ObjectUpdateRequest(recordRef.getId(), HubspotObjectSink.this.createObjectProperties(recordRef.getMessage().getRecord()))).collect(Collectors.toList());
            BatchUpdateRequest batchUpdateRequest = new BatchUpdateRequest(objectUpdateRequests);
            try {
                HubspotObjectSink.this.hubspotRestClient.updateObjects(HubspotObjectSink.this.hubspotObject.getTypeId(), batchUpdateRequest, create);
            }
            catch (BatchObjectException e) {
                if (e.getBatchObjectError() == null || e.getBatchObjectError().getCategory() == null) {
                    log.error("Hubspot records update failed", (Throwable)((Object)e));
                }
                CastledError pipelineError = ((HubspotErrorParser)ObjectRegistry.getInstance(HubspotErrorParser.class)).parseError(e.getBatchObjectError());
                for (ObjectIdAndMessage record : records) {
                    HubspotObjectSink.this.errorOutputStream.writeFailedRecord(record.getMessage(), pipelineError);
                }
            }
        }
    }
}

