/*
 * Decompiled with CFR 0.152.
 */
package io.castled.apps.connectors.googlepubsub;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.core.CredentialsProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import io.castled.apps.DataSink;
import io.castled.apps.connectors.googlepubsub.GooglePubSubAppConfig;
import io.castled.apps.connectors.googlepubsub.GooglePubSubAppSyncConfig;
import io.castled.apps.connectors.googlepubsub.GooglePubSubCredentialsProvider;
import io.castled.apps.models.DataSinkRequest;
import io.castled.commons.models.AppSyncStats;
import io.castled.commons.models.DataSinkMessage;
import io.castled.schema.models.Message;
import io.castled.utils.MessageUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;

public class GooglePubSubDataSink
implements DataSink {
    private static final Logger log = LoggerFactory.getLogger(GooglePubSubDataSink.class);
    public static final long REQUEST_BYTES_THRESHOLD = 0xA00000L;
    public static final long MESSAGE_COUNT_BATCH_SIZE = 1000L;
    public static final int PUBLISH_DELAY_THRESHOLD = 1000;
    private static final long FLUSH_BATCH_SIZE = 10000L;
    private final AtomicLong recordsProcessed = new AtomicLong(0L);
    private final Set<Long> pendingMessageIds = Sets.newConcurrentHashSet();
    private long lastBufferedOffset = 0L;
    private volatile Exception exception;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void syncRecords(DataSinkRequest dataSinkRequest) throws Exception {
        GooglePubSubAppConfig googlePubSubAppConfig = (GooglePubSubAppConfig)dataSinkRequest.getExternalApp().getConfig();
        GooglePubSubAppSyncConfig googlePubSubAppSyncConfig = (GooglePubSubAppSyncConfig)dataSinkRequest.getAppSyncConfig();
        TopicName topicName = TopicName.of((String)googlePubSubAppConfig.getProjectID(), (String)googlePubSubAppSyncConfig.getObject().getTopicId());
        Publisher publisher = null;
        ArrayList<ApiFuture<String>> messageIdFutures = new ArrayList<ApiFuture<String>>();
        try {
            DataSinkMessage message;
            BatchingSettings batchingSettings = BatchingSettings.newBuilder().setElementCountThreshold(Long.valueOf(1000L)).setRequestByteThreshold(Long.valueOf(0xA00000L)).setDelayThreshold(Duration.ofMillis((long)1000L)).build();
            publisher = Publisher.newBuilder((TopicName)topicName).setBatchingSettings(batchingSettings).setCredentialsProvider((CredentialsProvider)new GooglePubSubCredentialsProvider(googlePubSubAppConfig.getServiceAccountDetails())).build();
            while ((message = dataSinkRequest.getMessageInputStream().readMessage()) != null) {
                messageIdFutures.add(this.publishMessage(publisher, message));
                if ((long)messageIdFutures.size() != 10000L) continue;
                this.publishOutstanding(publisher, messageIdFutures);
                messageIdFutures.clear();
            }
            this.publishOutstanding(publisher, messageIdFutures);
        }
        finally {
            if (publisher != null) {
                publisher.shutdown();
                publisher.awaitTermination(1L, TimeUnit.MINUTES);
            }
        }
    }

    private void publishOutstanding(Publisher publisher, List<ApiFuture<String>> messageIdFutures) throws Exception {
        if (CollectionUtils.isEmpty(messageIdFutures)) {
            return;
        }
        publisher.publishAllOutstanding();
        ApiFutures.allAsList(messageIdFutures).get();
        this.validateAndThrow();
    }

    private ApiFuture<String> publishMessage(Publisher publisher, DataSinkMessage message) throws Exception {
        this.pendingMessageIds.add(message.getOffset());
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(ByteString.copyFrom((byte[])MessageUtils.messageToBytes((Message)message.getMessage()))).build();
        ApiFuture messageIdFuture = publisher.publish(pubsubMessage);
        this.lastBufferedOffset = message.getOffset();
        ApiFutures.addCallback((ApiFuture)messageIdFuture, (ApiFutureCallback)new DataSinkCallback(message.getOffset()), (Executor)MoreExecutors.directExecutor());
        this.validateAndThrow();
        return messageIdFuture;
    }

    @Override
    public AppSyncStats getSyncStats() {
        return new AppSyncStats(this.recordsProcessed.get(), this.getProcessedOffset(), 0L);
    }

    public long getProcessedOffset() {
        try {
            long currentMinPendingId = Collections.min(this.pendingMessageIds);
            return currentMinPendingId - 1L;
        }
        catch (NoSuchElementException e) {
            return this.lastBufferedOffset;
        }
    }

    private void validateAndThrow() throws Exception {
        if (this.exception != null) {
            throw this.exception;
        }
    }

    public class DataSinkCallback
    implements ApiFutureCallback<String> {
        private final long messageOffset;

        public DataSinkCallback(long messageOffset) {
            this.messageOffset = messageOffset;
        }

        public void onFailure(Throwable throwable) {
            GooglePubSubDataSink.this.exception = (Exception)throwable;
        }

        public void onSuccess(String messageId) {
            GooglePubSubDataSink.this.recordsProcessed.incrementAndGet();
            GooglePubSubDataSink.this.pendingMessageIds.remove(this.messageOffset);
        }
    }
}

