/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.plugin.stream.kinesis.server;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.apache.pinot.spi.utils.retry.FixedDelayRetryPolicy;
import org.apache.pinot.spi.utils.retry.RetriableOperationException;
import org.apache.pinot.spi.utils.retry.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.http.apache.ApacheSdkHttpService;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;

public class KinesisDataProducer
implements StreamDataProducer {
    private static final Logger LOGGER = LoggerFactory.getLogger(KinesisDataProducer.class);
    public static final String ENDPOINT = "endpoint";
    public static final String REGION = "region";
    public static final String ACCESS = "access";
    public static final String SECRET = "secret";
    public static final String NUM_RETRIES = "num_retries";
    public static final String RETRY_DELAY_MILLIS = "retry_delay_millis";
    public static final String DEFAULT_ENDPOINT = "http://localhost:4566";
    public static final String DEFAULT_RETRY_DELAY_MILLIS = "10000";
    public static final String DEFAULT_NUM_RETRIES = "0";
    private KinesisClient _kinesisClient;
    private RetryPolicy _retryPolicy;

    public KinesisDataProducer() {
    }

    public KinesisDataProducer(KinesisClient kinesisClient) {
        this(kinesisClient, (RetryPolicy)new FixedDelayRetryPolicy(Integer.parseInt("01"), (long)Integer.parseInt(DEFAULT_RETRY_DELAY_MILLIS)));
    }

    public KinesisDataProducer(KinesisClient kinesisClient, RetryPolicy retryPolicy) {
        this._kinesisClient = kinesisClient;
        this._retryPolicy = retryPolicy;
    }

    public void init(Properties props) {
        if (this._kinesisClient == null) {
            try {
                KinesisClientBuilder kinesisClientBuilder = props.containsKey(ACCESS) && props.containsKey(SECRET) ? (KinesisClientBuilder)((KinesisClientBuilder)((KinesisClientBuilder)KinesisClient.builder().region(Region.of((String)props.getProperty(REGION)))).credentialsProvider(this.getLocalAWSCredentials(props))).httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder()) : (KinesisClientBuilder)((KinesisClientBuilder)((KinesisClientBuilder)KinesisClient.builder().region(Region.of((String)props.getProperty(REGION)))).credentialsProvider((AwsCredentialsProvider)DefaultCredentialsProvider.create())).httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
                if (props.containsKey(ENDPOINT)) {
                    String kinesisEndpoint = props.getProperty(ENDPOINT, DEFAULT_ENDPOINT);
                    try {
                        kinesisClientBuilder = (KinesisClientBuilder)kinesisClientBuilder.endpointOverride(new URI(kinesisEndpoint));
                    }
                    catch (URISyntaxException e) {
                        throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: " + kinesisEndpoint, e);
                    }
                }
                this._kinesisClient = (KinesisClient)kinesisClientBuilder.build();
                int numRetries = Integer.parseInt(props.getProperty(NUM_RETRIES, DEFAULT_NUM_RETRIES));
                long retryDelayMs = Long.parseLong(props.getProperty(RETRY_DELAY_MILLIS, DEFAULT_RETRY_DELAY_MILLIS));
                this._retryPolicy = new FixedDelayRetryPolicy(numRetries + 1, retryDelayMs);
            }
            catch (Exception e) {
                LOGGER.warn("Failed to create a kinesis client due to ", (Throwable)e);
                this._kinesisClient = null;
            }
        }
    }

    public void produce(String topic, byte[] payload) {
        try {
            this._retryPolicy.attempt(() -> this.putRecord(topic, null, payload));
        }
        catch (AttemptsExceededException ae) {
            LOGGER.error("Retries exhausted while pushing record in stream {}", (Object)topic);
        }
        catch (RetriableOperationException roe) {
            LOGGER.error("Error occurred while pushing records in stream {}", (Object)topic, (Object)roe);
        }
    }

    public void produce(String topic, byte[] key, byte[] payload) {
        try {
            this._retryPolicy.attempt(() -> this.putRecord(topic, key, payload));
        }
        catch (AttemptsExceededException ae) {
            LOGGER.error("Retries exhausted while pushing record in stream {}", (Object)topic);
        }
        catch (RetriableOperationException roe) {
            LOGGER.error("Error occurred while pushing records in stream {}", (Object)topic, (Object)roe);
        }
    }

    public void produceBatch(String topic, List<byte[]> rows) {
        try {
            this._retryPolicy.attempt(() -> this.putRecordBatch(topic, rows));
        }
        catch (AttemptsExceededException ae) {
            LOGGER.error("Retries exhausted while pushing record in stream {}", (Object)topic);
        }
        catch (RetriableOperationException roe) {
            LOGGER.error("Error occurred while pushing records in stream {}", (Object)topic, (Object)roe);
        }
    }

    public void close() {
        this._kinesisClient.close();
    }

    private AwsCredentialsProvider getLocalAWSCredentials(Properties props) {
        return StaticCredentialsProvider.create((AwsCredentials)AwsBasicCredentials.create((String)props.getProperty(ACCESS), (String)props.getProperty(SECRET)));
    }

    private boolean putRecordBatch(String topic, List<byte[]> rows) {
        try {
            ArrayList<PutRecordsRequestEntry> putRecordsRequestEntries = new ArrayList<PutRecordsRequestEntry>();
            for (byte[] row : rows) {
                putRecordsRequestEntries.add((PutRecordsRequestEntry)PutRecordsRequestEntry.builder().data(SdkBytes.fromByteArray((byte[])row)).partitionKey(UUID.randomUUID().toString()).build());
            }
            PutRecordsRequest putRecordsRequest = (PutRecordsRequest)PutRecordsRequest.builder().streamName(topic).records(putRecordsRequestEntries).build();
            PutRecordsResponse putRecordsResponse = this._kinesisClient.putRecords(putRecordsRequest);
            return putRecordsResponse.sdkHttpResponse().isSuccessful();
        }
        catch (Exception e) {
            LOGGER.warn("Exception occurred while pushing record to Kinesis {}", (Object)e.getMessage());
            return false;
        }
    }

    private boolean putRecord(String topic, byte[] key, byte[] payload) {
        try {
            PutRecordRequest.Builder putRecordRequestBuilder = PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray((byte[])payload));
            putRecordRequestBuilder = key != null ? putRecordRequestBuilder.partitionKey(new String(key)) : putRecordRequestBuilder.partitionKey(UUID.randomUUID().toString());
            PutRecordResponse putRecordResponse = this._kinesisClient.putRecord((PutRecordRequest)putRecordRequestBuilder.build());
            return putRecordResponse.sdkHttpResponse().isSuccessful();
        }
        catch (Exception e) {
            LOGGER.warn("Exception occurred while pushing record to Kinesis {}", (Object)e.getMessage());
            return false;
        }
    }
}

