/*
 * Decompiled with CFR 0.152.
 */
package io.castled.apps.connectors.customerio;

import com.google.common.collect.Maps;
import io.castled.ObjectRegistry;
import io.castled.apps.connectors.customerio.CustomerIOAppConfig;
import io.castled.apps.connectors.customerio.CustomerIOAppSyncConfig;
import io.castled.apps.connectors.customerio.CustomerIOErrorParser;
import io.castled.apps.connectors.customerio.CustomerIOObjectSink;
import io.castled.apps.connectors.customerio.client.CustomerIORestClient;
import io.castled.apps.models.DataSinkRequest;
import io.castled.apps.models.GenericSyncObject;
import io.castled.commons.errors.CastledError;
import io.castled.commons.errors.errorclassifications.UnclassifiedError;
import io.castled.commons.models.AppSyncStats;
import io.castled.commons.models.DataSinkMessage;
import io.castled.commons.models.MessageSyncStats;
import io.castled.commons.streams.ErrorOutputStream;
import io.castled.core.CastledOffsetQueue;
import io.castled.schema.SchemaUtils;
import io.castled.schema.models.Field;
import io.castled.schema.models.MessageOffsetSupplier;
import io.castled.schema.models.Schema;
import io.castled.schema.models.Tuple;
import io.castled.utils.TimeUtils;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CustomerIOEventSink
implements CustomerIOObjectSink<String> {
    private static final Logger log = LoggerFactory.getLogger(CustomerIOEventSink.class);
    private final ErrorOutputStream errorOutputStream;
    private final CustomerIOAppSyncConfig syncConfig;
    private final AppSyncStats syncStats;
    private final List<String> primaryKeys;
    private final List<String> mappedFields;
    private final CustomerIORestClient customerIORestClient;
    private final CustomerIOErrorParser customerIOErrorParser;
    private final GenericSyncObject audienceSyncObject;
    private final AtomicLong failedRecords = new AtomicLong(0L);
    private final AtomicLong processedRecords = new AtomicLong(0L);
    private long lastProcessedOffset = 0L;
    private final CastledOffsetQueue<DataSinkMessage> recordsBuffer = new CastledOffsetQueue((Consumer)new EventConsumer(), 10, 10, true);

    public CustomerIOEventSink(DataSinkRequest dataSinkRequest) {
        this.customerIORestClient = new CustomerIORestClient(((CustomerIOAppConfig)dataSinkRequest.getExternalApp().getConfig()).getSiteId(), ((CustomerIOAppConfig)dataSinkRequest.getExternalApp().getConfig()).getApiKey());
        this.errorOutputStream = dataSinkRequest.getErrorOutputStream();
        this.customerIOErrorParser = (CustomerIOErrorParser)ObjectRegistry.getInstance(CustomerIOErrorParser.class);
        this.audienceSyncObject = ((CustomerIOAppSyncConfig)dataSinkRequest.getAppSyncConfig()).getObject();
        this.syncConfig = (CustomerIOAppSyncConfig)dataSinkRequest.getAppSyncConfig();
        this.primaryKeys = dataSinkRequest.getPrimaryKeys();
        this.syncStats = new AppSyncStats(0L, 0L, 0L);
        this.mappedFields = dataSinkRequest.getMappedFields();
    }

    @Override
    public void createOrUpdateObject(DataSinkMessage message) {
        try {
            this.recordsBuffer.writePayload((MessageOffsetSupplier)message, 5, TimeUnit.MINUTES);
        }
        catch (TimeoutException e) {
            log.error("Unable to publish record to records queue", (Throwable)e);
            this.errorOutputStream.writeFailedRecord(message, new UnclassifiedError("Internal error! Unable to publish records to records queue. Please contact support"));
        }
    }

    @Override
    public void flushRecords() throws TimeoutException {
        this.recordsBuffer.flush(TimeUtils.minutesToMillis((long)10L));
    }

    @Override
    public MessageSyncStats getSyncStats() {
        return new MessageSyncStats(this.processedRecords.get(), this.recordsBuffer.getProcessedOffset());
    }

    private Map<String, Object> constructProperties(Tuple record) {
        HashMap recordProperties = Maps.newHashMap();
        for (Field field : record.getFields()) {
            Object value = record.getValue(field.getName());
            if (value == null) continue;
            if (SchemaUtils.isZonedTimestamp((Schema)field.getSchema())) {
                recordProperties.put(field.getName(), ((ZonedDateTime)value).toEpochSecond());
            }
            if (value instanceof LocalDate) {
                recordProperties.put(field.getName(), ((LocalDate)value).format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
                continue;
            }
            if (value instanceof LocalDateTime) {
                recordProperties.put(field.getName(), ((LocalDateTime)value).format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")));
                continue;
            }
            recordProperties.put(field.getName(), value);
        }
        return recordProperties;
    }

    private class EventConsumer
    implements Consumer<DataSinkMessage> {
        private EventConsumer() {
        }

        @Override
        public void accept(DataSinkMessage message) {
            Map eventProperties = CustomerIOEventSink.this.constructProperties(message.getRecord());
            try {
                CustomerIOEventSink.this.customerIORestClient.insertEventDetails(eventProperties, CustomerIOEventSink.this.primaryKeys);
            }
            catch (Exception e) {
                log.error("Error : " + e);
                CustomerIOEventSink.this.failedRecords.incrementAndGet();
                CastledError pipelineError = CustomerIOEventSink.this.customerIOErrorParser.getPipelineError(e.getLocalizedMessage());
                CustomerIOEventSink.this.errorOutputStream.writeFailedRecord(message, pipelineError);
            }
            CustomerIOEventSink.this.processedRecords.incrementAndGet();
        }
    }
}

