/*
 * Decompiled with CFR 0.152.
 */
package io.castled.apps.connectors.customerio;

import io.castled.apps.DataSink;
import io.castled.apps.connectors.customerio.CustomerIOAppSyncConfig;
import io.castled.apps.connectors.customerio.CustomerIOEventSink;
import io.castled.apps.connectors.customerio.CustomerIOObject;
import io.castled.apps.connectors.customerio.CustomerIOObjectSink;
import io.castled.apps.connectors.customerio.CustomerIOPersonSink;
import io.castled.apps.models.DataSinkRequest;
import io.castled.commons.models.AppSyncStats;
import io.castled.commons.models.DataSinkMessage;
import io.castled.exceptions.CastledRuntimeException;
import java.util.List;
import java.util.Optional;

public class CustomerIODataSink
implements DataSink {
    private volatile CustomerIOObjectSink<String> customerIOObjectSink;
    private long skippedRecords = 0L;

    @Override
    public void syncRecords(DataSinkRequest dataSinkRequest) throws Exception {
        DataSinkMessage message;
        this.customerIOObjectSink = this.getObjectSink(dataSinkRequest);
        while ((message = dataSinkRequest.getMessageInputStream().readMessage()) != null) {
            if (this.writeRecord(message, dataSinkRequest.getPrimaryKeys())) continue;
            ++this.skippedRecords;
        }
        this.customerIOObjectSink.flushRecords();
    }

    private CustomerIOObjectSink<String> getObjectSink(DataSinkRequest dataSinkRequest) {
        CustomerIOObjectSink<String> customerIOObjectSink = null;
        CustomerIOObject customerIOObject = CustomerIOObject.getObjectByName(((CustomerIOAppSyncConfig)dataSinkRequest.getAppSyncConfig()).getObject().getObjectName());
        switch (customerIOObject) {
            case EVENT: {
                customerIOObjectSink = new CustomerIOEventSink(dataSinkRequest);
                break;
            }
            case PERSON: {
                customerIOObjectSink = new CustomerIOPersonSink(dataSinkRequest);
                break;
            }
            default: {
                throw new CastledRuntimeException(String.format("Invalid object type %s!", customerIOObject.getName()));
            }
        }
        return customerIOObjectSink;
    }

    @Override
    public AppSyncStats getSyncStats() {
        return Optional.ofNullable(this.customerIOObjectSink).map(CustomerIOObjectSink::getSyncStats).map(statsRef -> new AppSyncStats(statsRef.getRecordsProcessed(), statsRef.getOffset(), this.skippedRecords)).orElse(new AppSyncStats(0L, 0L, 0L));
    }

    private boolean writeRecord(DataSinkMessage message, List<String> primaryKeys) {
        this.customerIOObjectSink.createOrUpdateObject(message);
        return true;
    }
}

