/*
 * Decompiled with CFR 0.152.
 */
package io.castled.apps.connectors.fbcustomaudience;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.castled.ObjectRegistry;
import io.castled.apps.BufferedObjectSink;
import io.castled.apps.connectors.fbcustomaudience.FbAppConfig;
import io.castled.apps.connectors.fbcustomaudience.FbCustomAudAppSyncConfig;
import io.castled.apps.connectors.fbcustomaudience.FbCustomAudienceFormatUtils;
import io.castled.apps.connectors.fbcustomaudience.FbCustomerErrors;
import io.castled.apps.connectors.fbcustomaudience.FbErrorParser;
import io.castled.apps.connectors.fbcustomaudience.client.FbRestClient;
import io.castled.commons.models.AppSyncStats;
import io.castled.commons.models.DataSinkMessage;
import io.castled.commons.streams.ErrorOutputStream;
import io.castled.exceptions.CastledRuntimeException;
import io.castled.schema.models.Field;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class FbCustomAudienceCustomerSink
extends BufferedObjectSink<DataSinkMessage> {
    private static final long BATCH_SIZE_MAX = 10000L;
    private final FbCustomAudAppSyncConfig syncConfig;
    private final FbRestClient fbRestClient;
    private final ErrorOutputStream errorOutputStream;
    private final AppSyncStats syncStats;
    private final FbErrorParser errorParser;

    public FbCustomAudienceCustomerSink(FbAppConfig appConfig, FbCustomAudAppSyncConfig syncConfig, ErrorOutputStream errorOutputStream) {
        this.syncConfig = syncConfig;
        this.fbRestClient = new FbRestClient(appConfig, syncConfig);
        this.syncStats = new AppSyncStats();
        this.errorOutputStream = errorOutputStream;
        this.errorParser = (FbErrorParser)ObjectRegistry.getInstance(FbErrorParser.class);
    }

    @Override
    protected void writeRecords(List<DataSinkMessage> msgs) {
        List<String> schema = this.getSchema(msgs);
        List<List<String>> data = this.getData(msgs);
        FbCustomerErrors errors = this.fbRestClient.addCustomerList(schema, data);
        for (DataSinkMessage msg : msgs) {
            String errorMsg = errors.invalidEntrySamples.get(this.getRowKey(msg));
            if (errorMsg == null) continue;
            this.errorOutputStream.writeFailedRecord(msg, this.errorParser.getPipelineError(errorMsg));
        }
        this.updateStats(msgs.size(), ((DataSinkMessage)Iterables.getLast(msgs)).getOffset());
    }

    @Override
    public long getMaxBufferedObjects() {
        return 10000L;
    }

    protected String getRowKey(DataSinkMessage msg) {
        String rowKey = "[";
        for (Field field : msg.getRecord().getFields()) {
            rowKey = rowKey + String.format("\"%s\",", field.getValue());
        }
        rowKey = rowKey.substring(0, rowKey.length() - 1);
        rowKey = rowKey + "]";
        return rowKey;
    }

    protected List<String> getSchema(List<DataSinkMessage> records) {
        DataSinkMessage msg = (DataSinkMessage)records.stream().findFirst().orElseThrow(() -> new CastledRuntimeException("Empty records list!"));
        return msg.getRecord().getFields().stream().map(Field::getName).collect(Collectors.toList());
    }

    protected List<List<String>> getData(List<DataSinkMessage> msgs) {
        ArrayList data = Lists.newArrayList();
        for (DataSinkMessage msg : msgs) {
            List<Object> tuple = Lists.newArrayList();
            tuple = this.syncConfig.isHashingRequired() ? msg.getRecord().getFields().stream().map(field -> new AbstractMap.SimpleEntry<String, String>(FbCustomAudienceFormatUtils.formatValue(field.getValue(), field.getName()), field.getName())).map(val -> FbCustomAudienceFormatUtils.hashValue((String)val.getKey(), (String)val.getValue())).collect(Collectors.toList()) : msg.getRecord().getFields().stream().map(field -> (String)field.getValue()).collect(Collectors.toList());
            data.add(tuple);
        }
        return data;
    }

    private void updateStats(long processed, long maxOffset) {
        this.syncStats.setRecordsProcessed(this.syncStats.getRecordsProcessed() + processed);
        this.syncStats.setOffset(Math.max(this.syncStats.getOffset(), maxOffset));
    }

    public AppSyncStats getSyncStats() {
        return this.syncStats;
    }
}

