/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.plugin.stream.kinesis.server;

import cloud.localstack.Localstack;
import cloud.localstack.docker.annotation.LocalstackDockerConfiguration;
import com.google.common.base.Function;
import java.net.URI;
import java.util.HashMap;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.spi.utils.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.apache.ApacheSdkHttpService;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.utils.AttributeMap;

public class KinesisDataServerStartable
implements StreamDataServerStartable {
    private static final Logger LOGGER = LoggerFactory.getLogger(KinesisDataServerStartable.class);
    public static final String NUM_SHARDS_PROPERTY = "numShards";
    public static final String DEFAULT_REGION = "us-east-1";
    public static final String DEFAULT_ACCESS_KEY = "access";
    public static final String DEFAULT_SECRET_KEY = "secret";
    public static final String DEFAULT_PORT = "4566";
    private final Localstack _localstackDocker = Localstack.INSTANCE;
    LocalstackDockerConfiguration _dockerConfig;
    Properties _serverProperties;
    private String _localStackKinesisEndpoint = "http://localhost:%s";

    public void init(Properties props) {
        this._serverProperties = props;
        HashMap<String, String> environmentVariables = new HashMap<String, String>();
        environmentVariables.put("SERVICES", "kinesis");
        this._dockerConfig = LocalstackDockerConfiguration.builder().portEdge(this._serverProperties.getProperty("port", DEFAULT_PORT)).portElasticSearch(String.valueOf(NetUtils.findOpenPort((int)4571))).imageTag("0.12.15").environmentVariables(environmentVariables).build();
        this._localStackKinesisEndpoint = String.format(this._localStackKinesisEndpoint, this._serverProperties.getProperty("port", DEFAULT_PORT));
    }

    public void start() {
        this._localstackDocker.startup(this._dockerConfig);
    }

    public void stop() {
        this._localstackDocker.stop();
    }

    public void createTopic(final String topic, Properties topicProps) {
        try {
            final KinesisClient kinesisClient = (KinesisClient)((KinesisClientBuilder)((KinesisClientBuilder)((KinesisClientBuilder)((KinesisClientBuilder)KinesisClient.builder().httpClient(new ApacheSdkHttpService().createHttpClientBuilder().buildWithDefaults(AttributeMap.builder().put((AttributeMap.Key)SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, (Object)Boolean.TRUE).build()))).credentialsProvider(this.getLocalAWSCredentials())).region(Region.of((String)DEFAULT_REGION))).endpointOverride(new URI(this._localStackKinesisEndpoint))).build();
            kinesisClient.createStream((CreateStreamRequest)CreateStreamRequest.builder().streamName(topic).shardCount((Integer)topicProps.get(NUM_SHARDS_PROPERTY)).build());
            KinesisDataServerStartable.waitForCondition(new Function<Void, Boolean>(){

                @Nullable
                public Boolean apply(@Nullable Void aVoid) {
                    try {
                        String kinesisStreamStatus = kinesisClient.describeStream((DescribeStreamRequest)DescribeStreamRequest.builder().streamName(topic).build()).streamDescription().streamStatusAsString();
                        return kinesisStreamStatus.contentEquals("ACTIVE");
                    }
                    catch (Exception e) {
                        LOGGER.warn("Could not fetch kinesis stream status", (Throwable)e);
                        return null;
                    }
                }
            }, 1000L, 30000L, "Kinesis stream " + topic + " is not created or is not in active state");
            LOGGER.info("Kinesis stream created successfully: " + topic);
        }
        catch (Exception e) {
            LOGGER.warn("Error occurred while creating topic: " + topic, (Throwable)e);
        }
    }

    public int getPort() {
        return this._localstackDocker.getEdgePort();
    }

    private AwsCredentialsProvider getLocalAWSCredentials() {
        return StaticCredentialsProvider.create((AwsCredentials)AwsBasicCredentials.create((String)DEFAULT_ACCESS_KEY, (String)DEFAULT_SECRET_KEY));
    }

    private static void waitForCondition(Function<Void, Boolean> condition, long checkIntervalMs, long timeoutMs, @Nullable String errorMessage) {
        Object errorMessageSuffix;
        long endTime = System.currentTimeMillis() + timeoutMs;
        Object object = errorMessageSuffix = errorMessage != null ? ", error message: " + errorMessage : "";
        while (System.currentTimeMillis() < endTime) {
            try {
                if (Boolean.TRUE.equals(condition.apply(null))) {
                    return;
                }
                Thread.sleep(checkIntervalMs);
            }
            catch (Exception e) {
                LOGGER.error("Caught exception while checking the condition" + (String)errorMessageSuffix, (Throwable)e);
            }
        }
        LOGGER.error("Failed to meet condition in " + timeoutMs + "ms" + (String)errorMessageSuffix);
    }
}

