/*
 * Decompiled with CFR 0.152.
 */
package io.castled.apps.connectors.kafka;

import com.google.common.collect.Sets;
import io.castled.ObjectRegistry;
import io.castled.apps.DataSink;
import io.castled.apps.connectors.kafka.KafkaAppConfig;
import io.castled.apps.connectors.kafka.KafkaAppSyncConfig;
import io.castled.apps.connectors.kafka.KafkaErrorParser;
import io.castled.apps.models.DataSinkRequest;
import io.castled.commons.models.AppSyncStats;
import io.castled.commons.streams.ErrorOutputStream;
import io.castled.kafka.producer.CastledKafkaProducer;
import io.castled.kafka.producer.CastledProducerCallback;
import io.castled.kafka.producer.KafkaProducerConfiguration;
import io.castled.schema.models.Message;
import io.castled.utils.MessageUtils;
import java.util.Collections;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class KafkaDataSink
implements DataSink {
    private final AtomicLong recordsProcessed = new AtomicLong(0L);
    private final Set<Long> pendingMessageIds = Sets.newConcurrentHashSet();
    private long lastBufferedOffset = 0L;
    private volatile Exception throwable;

    @Override
    public void syncRecords(DataSinkRequest dataSinkRequest) throws Exception {
        KafkaAppConfig kafkaAppConfig = (KafkaAppConfig)dataSinkRequest.getExternalApp().getConfig();
        KafkaAppSyncConfig kafkaAppSyncConfig = (KafkaAppSyncConfig)dataSinkRequest.getAppSyncConfig();
        try (CastledKafkaProducer kafkaProducer = new CastledKafkaProducer(KafkaProducerConfiguration.builder().bootstrapServers(kafkaAppConfig.getBootstrapServers()).build());){
            Message message;
            while ((message = dataSinkRequest.getMessageInputStream().readMessage()) != null) {
                this.validateAndThrow();
                this.pendingMessageIds.add(message.getOffset());
                this.publishMessage(kafkaProducer, message, kafkaAppSyncConfig.getTopic(), dataSinkRequest.getErrorOutputStream());
                this.lastBufferedOffset = message.getOffset();
            }
            kafkaProducer.flush();
            this.validateAndThrow();
        }
    }

    private void publishMessage(CastledKafkaProducer kafkaProducer, Message message, String topic, ErrorOutputStream errorOutputStream) {
        try {
            kafkaProducer.publish(new ProducerRecord(topic, null, (Object)MessageUtils.messageToBytes((Message)message)), (CastledProducerCallback)new DataSinkCallback(message.getOffset()));
        }
        catch (Exception e) {
            this.pendingMessageIds.remove(message.getOffset());
            this.recordsProcessed.incrementAndGet();
            errorOutputStream.writeFailedRecord(message, ((KafkaErrorParser)ObjectRegistry.getInstance(KafkaErrorParser.class)).parseException(e));
        }
    }

    private void validateAndThrow() throws Exception {
        if (this.throwable != null) {
            throw this.throwable;
        }
    }

    @Override
    public AppSyncStats getSyncStats() {
        return new AppSyncStats(this.recordsProcessed.get(), this.getProcessedOffset(), 0L);
    }

    public long getProcessedOffset() {
        try {
            long currentMinPendingId = Collections.min(this.pendingMessageIds);
            return currentMinPendingId - 1L;
        }
        catch (NoSuchElementException e) {
            return this.lastBufferedOffset;
        }
    }

    public class DataSinkCallback
    implements CastledProducerCallback {
        private final long messageOffset;

        public DataSinkCallback(long messageOffset) {
            this.messageOffset = messageOffset;
        }

        public void onSuccess(RecordMetadata recordMetadata) {
            KafkaDataSink.this.recordsProcessed.incrementAndGet();
            KafkaDataSink.this.pendingMessageIds.remove(this.messageOffset);
        }

        public void onFailure(RecordMetadata recordMetadata, Exception e) {
            KafkaDataSink.this.throwable = e;
        }
    }
}

