/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.io.sinks;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectReader;
import com.google.common.collect.ImmutableMap;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.tests.integration.io.sinks.SinkTester;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.utility.DockerImageName;
import org.testng.Assert;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

public class KinesisSinkTester
extends SinkTester<LocalStackContainer> {
    private static final Logger log = LoggerFactory.getLogger(KinesisSinkTester.class);
    private static final String NAME = "kinesis";
    private static final int LOCALSTACK_SERVICE_PORT = 4566;
    public static final String STREAM_NAME = "my-stream-1";
    public static final ObjectReader READER = ObjectMapperFactory.getThreadLocal().reader();
    private final boolean withSchema;
    private KinesisAsyncClient client;

    public KinesisSinkTester(boolean withSchema) {
        super(NAME, SinkTester.SinkType.KINESIS);
        this.withSchema = withSchema;
        this.sinkConfig.put("awsKinesisStreamName", STREAM_NAME);
        this.sinkConfig.put("awsRegion", "us-east-1");
        this.sinkConfig.put("awsCredentialPluginParam", "{\"accessKey\":\"access\",\"secretKey\":\"secret\"}");
        if (withSchema) {
            this.sinkConfig.put("messageFormat", "FULL_MESSAGE_IN_JSON_EXPAND_VALUE");
        }
    }

    @Override
    public Schema<?> getInputTopicSchema() {
        if (this.withSchema) {
            return Schema.AUTO_CONSUME();
        }
        return Schema.STRING;
    }

    @Override
    public void prepareSink() throws Exception {
        LocalStackContainer localStackContainer = (LocalStackContainer)this.getServiceContainer();
        URI endpointOverride = localStackContainer.getEndpointOverride(LocalStackContainer.Service.KINESIS);
        this.sinkConfig.put("awsEndpoint", NAME);
        this.sinkConfig.put("awsEndpointPort", 4566);
        this.sinkConfig.put("skipCertificateValidation", true);
        this.client = (KinesisAsyncClient)((KinesisAsyncClientBuilder)((KinesisAsyncClientBuilder)((KinesisAsyncClientBuilder)KinesisAsyncClient.builder().credentialsProvider(() -> AwsBasicCredentials.create((String)"access", (String)"secret"))).region(Region.US_EAST_1)).endpointOverride(endpointOverride)).build();
        log.info("prepareSink for kinesis: creating stream {}, endpoint {}", (Object)STREAM_NAME, (Object)endpointOverride);
        this.client.createStream((CreateStreamRequest)CreateStreamRequest.builder().streamName(STREAM_NAME).shardCount(Integer.valueOf(1)).build()).get();
        log.info("prepareSink for kinesis: created stream {}", (Object)STREAM_NAME);
    }

    @Override
    public void stopServiceContainer() {
        if (this.client != null) {
            this.client.close();
        }
        super.stopServiceContainer();
    }

    @Override
    protected LocalStackContainer createSinkService(PulsarCluster cluster) {
        return new LocalStackContainer(DockerImageName.parse((String)"localstack/localstack:latest")).withServices(new LocalStackContainer.Service[]{LocalStackContainer.Service.KINESIS});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void produceMessage(int numMessages, PulsarClient client, String inputTopicName, LinkedHashMap<String, String> kvs) throws Exception {
        if (this.withSchema) {
            Schema kvSchema = Schema.KeyValue((Schema)Schema.JSON(SimplePojo.class), (Schema)Schema.AVRO(SimplePojo.class), (KeyValueEncodingType)KeyValueEncodingType.SEPARATED);
            Producer producer = client.newProducer(kvSchema).topic(inputTopicName).create();
            try {
                for (int i = 0; i < numMessages; ++i) {
                    String key = String.valueOf(i);
                    kvs.put(key, key);
                    SimplePojo keyPojo = new SimplePojo("f1_" + i, "f2_" + i, Arrays.asList(i, i + 1), new HashSet<Long>(Arrays.asList(i)), (Map<String, String>)ImmutableMap.of((Object)("map1_k_" + i), (Object)("map1_kv_" + i)));
                    SimplePojo valuePojo = new SimplePojo(String.valueOf(i), "v2_" + i, Arrays.asList(i, i + 1), new HashSet<Long>(Arrays.asList(i)), (Map<String, String>)ImmutableMap.of((Object)("map1_v_" + i), (Object)("map1_vv_" + i)));
                    producer.newMessage().value((Object)new KeyValue((Object)keyPojo, (Object)valuePojo)).send();
                }
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        Producer producer = client.newProducer(Schema.STRING).topic(inputTopicName).create();
        try {
            for (int i = 0; i < numMessages; ++i) {
                String key = "key-" + i;
                String value = "value-" + i;
                kvs.put(key, value);
                producer.newMessage().key(key).value((Object)value).send();
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    @Override
    public void validateSinkResult(Map<String, String> kvs) {
        Awaitility.await().untilAsserted(() -> this.internalValidateSinkResult(kvs));
    }

    private void internalValidateSinkResult(Map<String, String> kvs) {
        String shardId = ((Shard)((ListShardsResponse)this.client.listShards((ListShardsRequest)ListShardsRequest.builder().streamName(STREAM_NAME).build()).get()).shards().get(0)).shardId();
        String iterator = ((GetShardIteratorResponse)this.client.getShardIterator((GetShardIteratorRequest)GetShardIteratorRequest.builder().streamName(STREAM_NAME).shardId(shardId).shardIteratorType(ShardIteratorType.TRIM_HORIZON).build()).get()).shardIterator();
        LinkedHashMap<String, String> actualKvs = new LinkedHashMap<String, String>();
        this.addMoreRecords(actualKvs, iterator);
        Assert.assertEquals(actualKvs, kvs);
    }

    private void parseRecordData(Map<String, String> actualKvs, String data, String partitionKey) {
        if (this.withSchema) {
            JsonNode payload = READER.readTree(data).at("/payload");
            String i = payload.at("/value/field1").asText();
            Assert.assertEquals((String)payload.at("/value/field2").asText(), (String)("v2_" + i));
            Assert.assertEquals((String)payload.at("/key/field1").asText(), (String)("f1_" + i));
            Assert.assertEquals((String)payload.at("/key/field2").asText(), (String)("f2_" + i));
            actualKvs.put(i, i);
        } else {
            actualKvs.put(partitionKey, data);
        }
    }

    private void addMoreRecords(Map<String, String> actualKvs, String iterator) {
        GetRecordsResponse response;
        ArrayList<KinesisClientRecord> aggRecords = new ArrayList<KinesisClientRecord>();
        do {
            GetRecordsRequest request;
            if ((response = (GetRecordsResponse)this.client.getRecords(request = (GetRecordsRequest)GetRecordsRequest.builder().shardIterator(iterator).build()).get()).hasRecords()) {
                for (Record record : response.records()) {
                    try {
                        String data = record.data().asString(StandardCharsets.UTF_8);
                        this.parseRecordData(actualKvs, data, record.partitionKey());
                    }
                    catch (UncheckedIOException e) {
                        aggRecords.add(KinesisClientRecord.fromRecord((Record)record));
                    }
                }
            }
            iterator = response.nextShardIterator();
        } while (response.millisBehindLatest() != 0L);
        for (KinesisClientRecord record : new AggregatorUtil().deaggregate(aggRecords)) {
            String data = new String(record.data().array(), StandardCharsets.UTF_8);
            this.parseRecordData(actualKvs, data, record.partitionKey());
        }
    }

    @Override
    public void close() throws Exception {
        if (this.client != null) {
            this.client.close();
            this.client = null;
        }
    }

    public static final class SimplePojo {
        private String field1;
        private String field2;
        private List<Integer> list1;
        private Set<Long> set1;
        private Map<String, String> map1;

        public String getField1() {
            return this.field1;
        }

        public String getField2() {
            return this.field2;
        }

        public List<Integer> getList1() {
            return this.list1;
        }

        public Set<Long> getSet1() {
            return this.set1;
        }

        public Map<String, String> getMap1() {
            return this.map1;
        }

        public void setField1(String field1) {
            this.field1 = field1;
        }

        public void setField2(String field2) {
            this.field2 = field2;
        }

        public void setList1(List<Integer> list1) {
            this.list1 = list1;
        }

        public void setSet1(Set<Long> set1) {
            this.set1 = set1;
        }

        public void setMap1(Map<String, String> map1) {
            this.map1 = map1;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof SimplePojo)) {
                return false;
            }
            SimplePojo other = (SimplePojo)o;
            String this$field1 = this.getField1();
            String other$field1 = other.getField1();
            if (this$field1 == null ? other$field1 != null : !this$field1.equals(other$field1)) {
                return false;
            }
            String this$field2 = this.getField2();
            String other$field2 = other.getField2();
            if (this$field2 == null ? other$field2 != null : !this$field2.equals(other$field2)) {
                return false;
            }
            List<Integer> this$list1 = this.getList1();
            List<Integer> other$list1 = other.getList1();
            if (this$list1 == null ? other$list1 != null : !((Object)this$list1).equals(other$list1)) {
                return false;
            }
            Set<Long> this$set1 = this.getSet1();
            Set<Long> other$set1 = other.getSet1();
            if (this$set1 == null ? other$set1 != null : !((Object)this$set1).equals(other$set1)) {
                return false;
            }
            Map<String, String> this$map1 = this.getMap1();
            Map<String, String> other$map1 = other.getMap1();
            return !(this$map1 == null ? other$map1 != null : !((Object)this$map1).equals(other$map1));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $field1 = this.getField1();
            result = result * 59 + ($field1 == null ? 43 : $field1.hashCode());
            String $field2 = this.getField2();
            result = result * 59 + ($field2 == null ? 43 : $field2.hashCode());
            List<Integer> $list1 = this.getList1();
            result = result * 59 + ($list1 == null ? 43 : ((Object)$list1).hashCode());
            Set<Long> $set1 = this.getSet1();
            result = result * 59 + ($set1 == null ? 43 : ((Object)$set1).hashCode());
            Map<String, String> $map1 = this.getMap1();
            result = result * 59 + ($map1 == null ? 43 : ((Object)$map1).hashCode());
            return result;
        }

        public String toString() {
            return "KinesisSinkTester.SimplePojo(field1=" + this.getField1() + ", field2=" + this.getField2() + ", list1=" + this.getList1() + ", set1=" + this.getSet1() + ", map1=" + this.getMap1() + ")";
        }

        public SimplePojo(String field1, String field2, List<Integer> list1, Set<Long> set1, Map<String, String> map1) {
            this.field1 = field1;
            this.field2 = field2;
            this.list1 = list1;
            this.set1 = set1;
            this.map1 = map1;
        }
    }
}

