/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.io;

import java.time.Duration;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import net.jodah.failsafe.RetryPolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.tests.integration.io.sinks.SinkTester;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class PulsarIOTestRunner {
    private static final Logger log = LoggerFactory.getLogger(PulsarIOTestRunner.class);
    final Duration ONE_MINUTE = Duration.ofMinutes(1L);
    final Duration TEN_SECONDS = Duration.ofSeconds(10L);
    protected final RetryPolicy statusRetryPolicy = new RetryPolicy().withMaxDuration(this.ONE_MINUTE).withDelay(this.TEN_SECONDS).onRetry(e -> log.error("Retry ... "));
    protected PulsarCluster pulsarCluster;
    protected String functionRuntimeType;

    protected PulsarIOTestRunner(PulsarCluster cluster, String functionRuntimeType) {
        this.pulsarCluster = cluster;
        this.functionRuntimeType = functionRuntimeType;
    }

    protected Schema getSchema(boolean jsonWithEnvelope) {
        if (jsonWithEnvelope) {
            return KeyValueSchemaImpl.kvBytes();
        }
        return KeyValueSchemaImpl.of((Schema)Schema.AUTO_CONSUME(), (Schema)Schema.AUTO_CONSUME(), (KeyValueEncodingType)KeyValueEncodingType.SEPARATED);
    }

    protected <T> void ensureSubscriptionCreated(String inputTopicName, String subscriptionName, Schema<T> inputTopicSchema) throws Exception {
        try (PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();){
            Consumer ignored = client.newConsumer(inputTopicSchema).topic(new String[]{inputTopicName}).subscriptionType(SubscriptionType.Shared).subscriptionName(subscriptionName).subscribe();
            if (ignored != null) {
                ignored.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Map<String, String> produceMessagesToInputTopic(String inputTopicName, int numMessages, SinkTester<?> tester) throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
        try {
            LinkedHashMap<String, String> kvs = new LinkedHashMap<String, String>();
            tester.produceMessage(numMessages, client, inputTopicName, kvs);
            LinkedHashMap<String, String> linkedHashMap = kvs;
            return linkedHashMap;
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }
}

