/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.kinesis;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.Recycler;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.aws.AbstractAwsConnector;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.kinesis.Backoff;
import org.apache.pulsar.io.kinesis.KinesisSinkConfig;
import org.apache.pulsar.io.kinesis.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name="kinesis", type=IOType.SINK, help="A sink connector that copies messages from Pulsar to Kinesis", configClass=KinesisSinkConfig.class)
public class KinesisSink
extends AbstractAwsConnector
implements Sink<byte[]> {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisSink.class);
    private KinesisProducer kinesisProducer;
    private KinesisSinkConfig kinesisSinkConfig;
    private String streamName;
    private static final String defaultPartitionedKey = "default";
    private static final int maxPartitionedKeyLength = 256;
    private SinkContext sinkContext;
    private ScheduledExecutorService scheduledExecutor;
    private static final int FALSE = 0;
    private static final int TRUE = 1;
    private volatile int previousPublishFailed = 0;
    private static final AtomicIntegerFieldUpdater<KinesisSink> IS_PUBLISH_FAILED = AtomicIntegerFieldUpdater.newUpdater(KinesisSink.class, "previousPublishFailed");
    public static final String METRICS_TOTAL_INCOMING = "_kinesis_total_incoming_";
    public static final String METRICS_TOTAL_INCOMING_BYTES = "_kinesis_total_incoming_bytes_";
    public static final String METRICS_TOTAL_SUCCESS = "_kinesis_total_success_";
    public static final String METRICS_TOTAL_FAILURE = "_kinesis_total_failure_";

    private void sendUserRecord(ProducerSendCallback producerSendCallback) {
        ListenableFuture addRecordResult = this.kinesisProducer.addUserRecord(this.streamName, producerSendCallback.partitionedKey, producerSendCallback.data);
        Futures.addCallback((ListenableFuture)addRecordResult, (FutureCallback)producerSendCallback, (Executor)MoreExecutors.directExecutor());
    }

    public void write(Record<byte[]> record) throws Exception {
        if (this.kinesisSinkConfig.isRetainOrdering() && this.previousPublishFailed == 1) {
            LOG.warn("Skip acking message to retain ordering with previous failed message {}-{}", (Object)this.streamName, (Object)record.getRecordSequence());
            throw new IllegalStateException("kinesis queue has publish failure");
        }
        String partitionedKey = record.getKey().orElse(record.getTopicName().orElse(defaultPartitionedKey));
        partitionedKey = partitionedKey.length() > 256 ? partitionedKey.substring(0, 255) : partitionedKey;
        ByteBuffer data = KinesisSink.createKinesisMessage(this.kinesisSinkConfig.getMessageFormat(), record);
        this.sendUserRecord(ProducerSendCallback.create(this, record, System.nanoTime(), partitionedKey, data));
        if (this.sinkContext != null) {
            this.sinkContext.recordMetric(METRICS_TOTAL_INCOMING, 1.0);
            this.sinkContext.recordMetric(METRICS_TOTAL_INCOMING_BYTES, (double)data.array().length);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Published message to kinesis stream {} with size {}", (Object)this.streamName, (Object)((byte[])record.getValue()).length);
        }
    }

    public void close() throws IOException {
        if (this.kinesisProducer != null) {
            this.kinesisProducer.flush();
            this.kinesisProducer.destroy();
        }
        LOG.info("Kinesis sink stopped.");
    }

    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        this.kinesisSinkConfig = (KinesisSinkConfig)IOConfigUtils.loadWithSecrets(config, KinesisSinkConfig.class, (SinkContext)sinkContext);
        this.sinkContext = sinkContext;
        Preconditions.checkArgument((boolean)StringUtils.isNotBlank((CharSequence)this.kinesisSinkConfig.getAwsKinesisStreamName()), (Object)"empty kinesis-stream name");
        Preconditions.checkArgument((StringUtils.isNotBlank((CharSequence)this.kinesisSinkConfig.getAwsEndpoint()) || StringUtils.isNotBlank((CharSequence)this.kinesisSinkConfig.getAwsRegion()) ? 1 : 0) != 0, (Object)"Either the aws-end-point or aws-region must be set");
        Preconditions.checkArgument((boolean)StringUtils.isNotBlank((CharSequence)this.kinesisSinkConfig.getAwsCredentialPluginParam()), (Object)"empty aws-credential param");
        KinesisProducerConfiguration kinesisConfig = new KinesisProducerConfiguration();
        kinesisConfig.setKinesisEndpoint(this.kinesisSinkConfig.getAwsEndpoint());
        kinesisConfig.setRegion(this.kinesisSinkConfig.getAwsRegion());
        kinesisConfig.setThreadingModel(KinesisProducerConfiguration.ThreadingModel.POOLED);
        kinesisConfig.setThreadPoolSize(4);
        kinesisConfig.setCollectionMaxCount(1L);
        AWSCredentialsProvider credentialsProvider = this.createCredentialProvider(this.kinesisSinkConfig.getAwsCredentialPluginName(), this.kinesisSinkConfig.getAwsCredentialPluginParam()).getCredentialProvider();
        kinesisConfig.setCredentialsProvider(credentialsProvider);
        this.streamName = this.kinesisSinkConfig.getAwsKinesisStreamName();
        this.kinesisProducer = new KinesisProducer(kinesisConfig);
        IS_PUBLISH_FAILED.set(this, 0);
        LOG.info("Kinesis sink started. {}", (Object)ReflectionToStringBuilder.toString((Object)kinesisConfig, (ToStringStyle)ToStringStyle.SHORT_PREFIX_STYLE));
    }

    public static ByteBuffer createKinesisMessage(KinesisSinkConfig.MessageFormat msgFormat, Record<byte[]> record) {
        if (KinesisSinkConfig.MessageFormat.FULL_MESSAGE_IN_JSON.equals((Object)msgFormat)) {
            return ByteBuffer.wrap(Utils.serializeRecordToJson(record).getBytes());
        }
        if (KinesisSinkConfig.MessageFormat.FULL_MESSAGE_IN_FB.equals((Object)msgFormat)) {
            return Utils.serializeRecordToFlatBuffer(record);
        }
        return ByteBuffer.wrap((byte[])record.getValue());
    }

    private static final class ProducerSendCallback
    implements FutureCallback<UserRecordResult> {
        private Record<byte[]> resultContext;
        private long startTime = 0L;
        private final Recycler.Handle<ProducerSendCallback> recyclerHandle;
        private KinesisSink kinesisSink;
        private Backoff backoff;
        private String partitionedKey;
        private ByteBuffer data;
        private static final Recycler<ProducerSendCallback> RECYCLER = new Recycler<ProducerSendCallback>(){

            protected ProducerSendCallback newObject(Recycler.Handle<ProducerSendCallback> handle) {
                return new ProducerSendCallback(handle);
            }
        };

        private ProducerSendCallback(Recycler.Handle<ProducerSendCallback> recyclerHandle) {
            this.recyclerHandle = recyclerHandle;
        }

        static ProducerSendCallback create(KinesisSink kinesisSink, Record<byte[]> resultContext, long startTime, String partitionedKey, ByteBuffer data) {
            ProducerSendCallback sendCallback = (ProducerSendCallback)RECYCLER.get();
            sendCallback.resultContext = resultContext;
            sendCallback.kinesisSink = kinesisSink;
            sendCallback.startTime = startTime;
            sendCallback.partitionedKey = partitionedKey;
            sendCallback.data = data;
            if (kinesisSink.kinesisSinkConfig.isRetainOrdering() && sendCallback.backoff == null) {
                sendCallback.backoff = new Backoff(kinesisSink.kinesisSinkConfig.getRetryInitialDelayInMillis(), TimeUnit.MILLISECONDS, kinesisSink.kinesisSinkConfig.getRetryMaxDelayInMillis(), TimeUnit.MILLISECONDS, 0L, TimeUnit.SECONDS);
            }
            return sendCallback;
        }

        private void recycle() {
            this.resultContext = null;
            this.kinesisSink = null;
            this.startTime = 0L;
            if (this.backoff != null) {
                this.backoff.reset();
            }
            this.partitionedKey = null;
            this.data = null;
            this.recyclerHandle.recycle((Object)this);
        }

        public void onSuccess(UserRecordResult result) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Successfully published message for {}-{} with latency {}", new Object[]{this.kinesisSink.streamName, result.getShardId(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.startTime)});
            }
            if (this.kinesisSink.sinkContext != null) {
                this.kinesisSink.sinkContext.recordMetric(KinesisSink.METRICS_TOTAL_SUCCESS, 1.0);
            }
            this.kinesisSink.previousPublishFailed = 0;
            this.resultContext.ack();
            this.recycle();
        }

        public void onFailure(Throwable exception) {
            if (exception instanceof UserRecordFailedException) {
                UserRecordFailedException failedException = (UserRecordFailedException)exception;
                StringBuffer stringBuffer = new StringBuffer();
                failedException.getResult().getAttempts().forEach(attempt -> stringBuffer.append(String.format("errorMessage:%s, errorCode:%s, delay:%d, duration:%d;", attempt.getErrorMessage(), attempt.getErrorCode(), attempt.getDelay(), attempt.getDuration())));
                LOG.error("[{}] Failed to published message for replicator of {}-{}: Attempts:{}", new Object[]{this.kinesisSink.streamName, this.resultContext.getPartitionId(), this.resultContext.getRecordSequence(), stringBuffer.toString()});
            } else if (StringUtils.isEmpty((CharSequence)exception.getMessage())) {
                LOG.error("[{}] Failed to published message for replicator of {}-{}", new Object[]{this.kinesisSink.streamName, this.resultContext.getPartitionId(), this.resultContext.getRecordSequence(), exception});
            } else {
                LOG.error("[{}] Failed to published message for replicator of {}-{}, {} ", new Object[]{this.kinesisSink.streamName, this.resultContext.getPartitionId(), this.resultContext.getRecordSequence(), exception.getMessage()});
            }
            this.kinesisSink.previousPublishFailed = 1;
            if (this.kinesisSink.sinkContext != null) {
                this.kinesisSink.sinkContext.recordMetric(KinesisSink.METRICS_TOTAL_FAILURE, 1.0);
            }
            if (this.backoff != null) {
                long nextDelay = this.backoff.next();
                LOG.info("[{}] Retry to publish message for replicator of {}-{} after {} ms.", new Object[]{this.kinesisSink.streamName, this.resultContext.getPartitionId(), this.resultContext.getRecordSequence(), nextDelay});
                this.kinesisSink.scheduledExecutor.schedule(() -> this.kinesisSink.sendUserRecord(this), nextDelay, TimeUnit.MICROSECONDS);
            } else {
                this.recycle();
            }
        }
    }
}

