/*
 * Decompiled with CFR 0.152.
 */
package io.castled.apps.connectors.salesforce;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import io.castled.apps.DataSink;
import io.castled.apps.OAuthAppConfig;
import io.castled.apps.connectors.salesforce.SalesforceAppSyncConfig;
import io.castled.apps.connectors.salesforce.SalesforceErrorParser;
import io.castled.apps.connectors.salesforce.SalesforceFailedRecordsSchemaMapper;
import io.castled.apps.connectors.salesforce.SalesforceSinkConfig;
import io.castled.apps.connectors.salesforce.client.SFDCBulkClient;
import io.castled.apps.connectors.salesforce.client.SFDCRestClient;
import io.castled.apps.connectors.salesforce.client.dtos.ContentType;
import io.castled.apps.connectors.salesforce.client.dtos.InsertJobRequest;
import io.castled.apps.connectors.salesforce.client.dtos.Job;
import io.castled.apps.connectors.salesforce.client.dtos.JobRequest;
import io.castled.apps.connectors.salesforce.client.dtos.JobState;
import io.castled.apps.connectors.salesforce.client.dtos.JobStateUpdateRequest;
import io.castled.apps.connectors.salesforce.client.dtos.PkChunking;
import io.castled.apps.connectors.salesforce.client.dtos.UpsertJobRequest;
import io.castled.apps.models.DataSinkRequest;
import io.castled.apps.models.GenericSyncObject;
import io.castled.apps.syncconfigs.AppSyncConfig;
import io.castled.commons.models.AppSyncMode;
import io.castled.commons.models.AppSyncStats;
import io.castled.commons.models.DataSinkMessage;
import io.castled.commons.streams.ErrorOutputStream;
import io.castled.exceptions.CastledException;
import io.castled.exceptions.CastledRuntimeException;
import io.castled.functionalinterfaces.ThrowingConsumer;
import io.castled.schema.SchemaUtils;
import io.castled.schema.models.Field;
import io.castled.schema.models.FieldSchema;
import io.castled.schema.models.RecordSchema;
import io.castled.schema.models.Schema;
import io.castled.schema.models.Tuple;
import io.castled.utils.SizeUtils;
import io.castled.utils.ThreadUtils;
import io.castled.utils.TimeUtils;
import java.io.IOException;
import java.io.Reader;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.csv.QuoteMode;
import org.apache.commons.io.input.CharSequenceReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SalesforceDataSink
implements DataSink {
    private static final Logger log = LoggerFactory.getLogger(SalesforceDataSink.class);
    private CSVPrinter csvPrinter;
    private List<String> trackableFields;
    private final List<Object> existingPrimaryKeyValues = Lists.newArrayList();
    private final AtomicLong skippedRecords = new AtomicLong(0L);
    private final AtomicLong processedRecords = new AtomicLong(0L);
    private final SalesforceSinkConfig salesforceSinkConfig;
    private final SalesforceFailedRecordsSchemaMapper salesforceFailedRecordsSchemaMapper;
    private final SalesforceErrorParser salesforceErrorParser;

    @Inject
    public SalesforceDataSink(SalesforceSinkConfig salesforceSinkConfig, SalesforceFailedRecordsSchemaMapper salesforceFailedRecordsSchemaMapper, SalesforceErrorParser salesforceErrorParser) {
        this.salesforceSinkConfig = salesforceSinkConfig;
        this.salesforceFailedRecordsSchemaMapper = salesforceFailedRecordsSchemaMapper;
        this.salesforceErrorParser = salesforceErrorParser;
    }

    @Override
    public void syncRecords(DataSinkRequest dataSinkRequest) throws Exception {
        DataSinkMessage message;
        StringBuilder recordsBuffer = null;
        OAuthAppConfig salesforceAppConfig = (OAuthAppConfig)dataSinkRequest.getExternalApp().getConfig();
        SFDCRestClient sfdcRestClient = new SFDCRestClient(salesforceAppConfig.getOAuthToken(), salesforceAppConfig.getClientConfig());
        this.computeExistingPrimaryKeysIfReqd(salesforceAppConfig, dataSinkRequest.getPrimaryKeys(), dataSinkRequest.getAppSyncConfig());
        long recordsBuffered = 0L;
        String primaryKey = dataSinkRequest.getPrimaryKeys().get(0);
        HashMap primaryKeyMessageMapper = Maps.newHashMap();
        while ((message = dataSinkRequest.getMessageInputStream().readMessage()) != null) {
            if (this.csvPrinter == null) {
                recordsBuffer = new StringBuilder();
                this.trackableFields = message.getRecord().getFields().stream().map(Field::getName).filter(dataSinkRequest.getMappedFields()::contains).collect(Collectors.toList());
                this.csvPrinter = new CSVPrinter((Appendable)recordsBuffer, CSVFormat.DEFAULT.withHeader(this.trackableFields.toArray(new String[0])).withQuoteMode(QuoteMode.ALL));
            }
            if (this.appendRecordToBuffer(message, dataSinkRequest.getAppSyncConfig(), dataSinkRequest.getPrimaryKeys())) {
                primaryKeyMessageMapper.putIfAbsent(message.getRecord().getValue(primaryKey), message);
                ++recordsBuffered;
            } else {
                this.skippedRecords.incrementAndGet();
            }
            if (recordsBuffered <= 0L || (long)recordsBuffer.length() <= SizeUtils.convertMBToBytes((long)this.salesforceSinkConfig.getRequestBufferThreshold())) continue;
            this.csvPrinter.flush();
            this.csvPrinter.close();
            this.uploadBufferedRecords(recordsBuffered, sfdcRestClient, dataSinkRequest.getAppSyncConfig(), recordsBuffer, dataSinkRequest.getObjectSchema(), dataSinkRequest.getErrorOutputStream(), primaryKey, primaryKeyMessageMapper);
            primaryKeyMessageMapper.clear();
            this.csvPrinter = null;
            recordsBuffered = 0L;
            recordsBuffer = null;
        }
        if (this.csvPrinter != null) {
            this.csvPrinter.flush();
            this.csvPrinter.close();
        }
        if (recordsBuffered > 0L) {
            this.uploadBufferedRecords(recordsBuffered, sfdcRestClient, dataSinkRequest.getAppSyncConfig(), recordsBuffer, dataSinkRequest.getObjectSchema(), dataSinkRequest.getErrorOutputStream(), primaryKey, primaryKeyMessageMapper);
        }
    }

    @Override
    public AppSyncStats getSyncStats() {
        return new AppSyncStats(this.processedRecords.get(), 0L, this.skippedRecords.get());
    }

    private void computeExistingPrimaryKeysIfReqd(OAuthAppConfig salesforceAppConfig, List<String> primaryKeys, AppSyncConfig appSyncConfig) throws Exception {
        SalesforceAppSyncConfig sfdcAppSyncConfig = (SalesforceAppSyncConfig)appSyncConfig;
        GenericSyncObject sfdcSyncObject = sfdcAppSyncConfig.getObject();
        if (sfdcAppSyncConfig.getMode() == AppSyncMode.UPDATE) {
            String primaryKey = primaryKeys.get(0);
            SFDCBulkClient sfdcBulkClient = new SFDCBulkClient(salesforceAppConfig.getOAuthToken(), salesforceAppConfig.getClientConfig());
            String query = String.format("select %s from %s", primaryKey, sfdcSyncObject.getObjectName());
            sfdcBulkClient.runBulkQuery(query, PkChunking.builder().chunkSize(50000).enabled(true).build(), sfdcSyncObject.getObjectName(), TimeUtils.minutesToMillis((long)10L), (ThrowingConsumer<Map<String, Object>>)((ThrowingConsumer)record -> this.existingPrimaryKeyValues.add(record.get(primaryKey))));
        }
    }

    private void uploadBufferedRecords(long recordsBuffered, SFDCRestClient sfdcRestClient, AppSyncConfig appSyncConfig, StringBuilder recordsBuffer, RecordSchema objectSchema, ErrorOutputStream errorOutputStream, String primaryKey, Map<Object, DataSinkMessage> messageMapper) throws Exception {
        Job job = sfdcRestClient.createJob(this.createJobRequest(appSyncConfig, primaryKey));
        sfdcRestClient.uploadCsv(job.getId(), recordsBuffer.toString());
        sfdcRestClient.updateJobState(job.getId(), new JobStateUpdateRequest(JobState.UPLOAD_COMPLETE));
        long startTime = System.currentTimeMillis();
        ThreadUtils.interruptIgnoredSleep((long)TimeUtils.secondsToMillis((long)10L));
        int iterations = 0;
        while (true) {
            job = sfdcRestClient.getJob(job.getId());
            if (Lists.newArrayList((Object[])new JobState[]{JobState.JOB_COMPLETE, JobState.ABORTED, JobState.FAILED}).contains((Object)job.getState())) break;
            if (System.currentTimeMillis() - startTime > TimeUtils.minutesToMillis((long)this.salesforceSinkConfig.getUploadTimeoutMins())) {
                throw new TimeoutException();
            }
            ThreadUtils.interruptIgnoredSleep((long)((long)(++iterations) * TimeUtils.secondsToMillis((long)5L)));
        }
        this.processFailedReport((Reader)new CharSequenceReader((CharSequence)sfdcRestClient.getFailedReport(job.getId())), objectSchema, errorOutputStream, primaryKey, messageMapper);
        this.processedRecords.addAndGet(recordsBuffered);
    }

    private void processFailedReport(Reader reportReader, RecordSchema objectSchema, ErrorOutputStream errorOutputStream, String primaryKey, Map<Object, DataSinkMessage> primaryKeyOffsetMapper) throws CastledException {
        try {
            CSVParser csvParser = new CSVParser(reportReader, CSVFormat.RFC4180.withHeader(new String[0]).withSkipHeaderRecord());
            for (CSVRecord csvRecord : csvParser) {
                String sfError = csvRecord.get("sf__Error");
                Tuple.Builder recordBuilder = Tuple.builder();
                for (FieldSchema fieldSchema : objectSchema.getFieldSchemas()) {
                    String valueStr;
                    if (!primaryKey.equals(fieldSchema.getName()) || !csvRecord.isSet(fieldSchema.getName()) || (valueStr = csvRecord.get(fieldSchema.getName())) == null) continue;
                    recordBuilder.put(fieldSchema, this.salesforceFailedRecordsSchemaMapper.transformValue(valueStr, fieldSchema.getSchema()));
                }
                Tuple record = recordBuilder.build();
                errorOutputStream.writeFailedRecord(primaryKeyOffsetMapper.get(record.getValue(primaryKey)), this.salesforceErrorParser.parseSalesforceError(sfError));
            }
        }
        catch (Exception e) {
            log.error("Process failed records failed", (Throwable)e);
            throw new CastledException((Throwable)e);
        }
    }

    private JobRequest createJobRequest(AppSyncConfig appSyncConfig, String primaryKey) {
        SalesforceAppSyncConfig sfdcAppSyncConfig = (SalesforceAppSyncConfig)appSyncConfig;
        GenericSyncObject sfdcSyncObject = ((SalesforceAppSyncConfig)appSyncConfig).getObject();
        switch (sfdcAppSyncConfig.getMode()) {
            case UPSERT: {
                return new UpsertJobRequest(sfdcSyncObject.getObjectName(), ContentType.CSV, primaryKey);
            }
            case INSERT: {
                return new InsertJobRequest(sfdcSyncObject.getObjectName(), ContentType.CSV);
            }
            case UPDATE: {
                return new UpsertJobRequest(sfdcSyncObject.getObjectName(), ContentType.CSV, primaryKey);
            }
        }
        throw new CastledRuntimeException("Unhandled app sync mode: " + (Object)((Object)sfdcAppSyncConfig.getMode()));
    }

    private boolean appendRecordToBuffer(DataSinkMessage message, AppSyncConfig appSyncConfig, List<String> primaryKeys) throws IOException {
        SalesforceAppSyncConfig sfdcAppSyncConfig = (SalesforceAppSyncConfig)appSyncConfig;
        if (sfdcAppSyncConfig.getMode() == AppSyncMode.UPDATE) {
            String primaryKey = primaryKeys.get(0);
            if (!this.existingPrimaryKeyValues.contains(message.getRecord().getValue(primaryKey))) {
                return false;
            }
        }
        ArrayList transformedValues = Lists.newArrayList();
        for (Field field : message.getRecord().getFields()) {
            if (!this.trackableFields.contains(field.getName())) continue;
            String transformedValue = Optional.ofNullable(this.transformValue(message.getRecord().getValue(field.getName()), field.getSchema())).orElse("#N/A");
            transformedValues.add(transformedValue);
        }
        this.csvPrinter.printRecord((Iterable)transformedValues);
        return true;
    }

    private String transformValue(Object value, Schema schema) {
        if (value == null) {
            return null;
        }
        if (SchemaUtils.isZonedTimestamp((Schema)schema)) {
            ZonedDateTime zonedDateTime = (ZonedDateTime)value;
            return zonedDateTime.format(DateTimeFormatter.ISO_DATE_TIME);
        }
        if (SchemaUtils.isDateSchema((Schema)schema)) {
            LocalDate localDate = (LocalDate)value;
            return localDate.format(DateTimeFormatter.ISO_DATE);
        }
        if (SchemaUtils.isTimeSchema((Schema)schema)) {
            LocalTime localTime = (LocalTime)value;
            return localTime.format(DateTimeFormatter.ISO_TIME);
        }
        return value.toString();
    }
}

