/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.instance;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Summary;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.common.functions.ExternalPulsarConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.StateStore;
import org.apache.pulsar.functions.instance.FunctionResultRouter;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.instance.PulsarCluster;
import org.apache.pulsar.functions.instance.state.DefaultStateStore;
import org.apache.pulsar.functions.instance.state.StateManager;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.ProducerConfigUtils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;

class ContextImpl
implements Context,
SinkContext,
SourceContext,
AutoCloseable {
    private InstanceConfig config;
    private Logger logger;
    private Record<?> record;
    private String defaultPulsarCluster;
    private Map<String, PulsarCluster> externalPulsarClusters;
    private final SecretsProvider secretsProvider;
    private final Map<String, Object> secretsMap;
    @VisibleForTesting
    StateManager stateManager;
    @VisibleForTesting
    DefaultStateStore defaultStateStore;
    private Map<String, Object> userConfigs;
    private ComponentStatsManager statsManager;
    Map<String, String[]> userMetricsLabels = new HashMap<String, String[]>();
    private final String[] metricsLabels;
    private final Summary userMetricsSummary;
    private static final String[] userMetricsLabelNames = Arrays.copyOf(ComponentStatsManager.metricsLabelNames, ComponentStatsManager.metricsLabelNames.length + 1);
    private final Function.FunctionDetails.ComponentType componentType;

    public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels, Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager, StateManager stateManager) {
        String prefix;
        this.config = config;
        this.logger = logger;
        this.statsManager = statsManager;
        this.externalPulsarClusters = new HashMap<String, PulsarCluster>();
        if (!config.getFunctionDetails().getExternalPulsarsMap().isEmpty()) {
            Map externalPulsarConfig = (Map)new Gson().fromJson(config.getFunctionDetails().getExternalPulsarsMap(), new TypeToken<Map<String, ExternalPulsarConfig>>(){}.getType());
            for (Map.Entry entry : externalPulsarConfig.entrySet()) {
                try {
                    this.externalPulsarClusters.put((String)entry.getKey(), new PulsarCluster(InstanceUtils.createPulsarClient(((ExternalPulsarConfig)entry.getValue()).getServiceURL(), ((ExternalPulsarConfig)entry.getValue()).getAuthConfig()), ProducerConfigUtils.convert((ProducerConfig)((ExternalPulsarConfig)entry.getValue()).getProducerConfig())));
                }
                catch (PulsarClientException ex) {
                    throw new RuntimeException("failed to create pulsar client for external cluster: " + (String)entry.getKey(), ex);
                }
            }
        }
        this.defaultPulsarCluster = "default-" + UUID.randomUUID();
        this.externalPulsarClusters.put(this.defaultPulsarCluster, new PulsarCluster(client, config.getFunctionDetails().getSink().getProducerSpec()));
        this.userConfigs = config.getFunctionDetails().getUserConfig().isEmpty() ? new HashMap<String, Object>() : (Map)new Gson().fromJson(config.getFunctionDetails().getUserConfig(), new TypeToken<Map<String, Object>>(){}.getType());
        this.secretsProvider = secretsProvider;
        this.secretsMap = !StringUtils.isEmpty((CharSequence)config.getFunctionDetails().getSecretsMap()) ? (Map<Object, Object>)new Gson().fromJson(config.getFunctionDetails().getSecretsMap(), new TypeToken<Map<String, Object>>(){}.getType()) : new HashMap<String, Object>();
        this.metricsLabels = metricsLabels;
        switch (componentType) {
            case FUNCTION: {
                prefix = "pulsar_function_";
                break;
            }
            case SINK: {
                prefix = "pulsar_sink_";
                break;
            }
            case SOURCE: {
                prefix = "pulsar_source_";
                break;
            }
            default: {
                throw new RuntimeException("Unknown component type: " + componentType);
            }
        }
        this.userMetricsSummary = (Summary)((Summary.Builder)((Summary.Builder)((Summary.Builder)Summary.build().name(prefix + "user_metric_")).help("User defined metric.")).labelNames(userMetricsLabelNames)).quantile(0.5, 0.01).quantile(0.9, 0.01).quantile(0.99, 0.01).quantile(0.999, 0.01).register(collectorRegistry);
        this.componentType = componentType;
        this.stateManager = stateManager;
        this.defaultStateStore = (DefaultStateStore)stateManager.getStore(config.getFunctionDetails().getTenant(), config.getFunctionDetails().getNamespace(), config.getFunctionDetails().getName());
    }

    public void setCurrentMessageContext(Record<?> record) {
        this.record = record;
    }

    public Record<?> getCurrentRecord() {
        return this.record;
    }

    public Collection<String> getInputTopics() {
        return this.config.getFunctionDetails().getSource().getInputSpecsMap().keySet();
    }

    public String getOutputTopic() {
        return this.config.getFunctionDetails().getSink().getTopic();
    }

    public String getOutputSchemaType() {
        Function.SinkSpec sink = this.config.getFunctionDetails().getSink();
        if (!StringUtils.isEmpty((CharSequence)sink.getSchemaType())) {
            return sink.getSchemaType();
        }
        return sink.getSerDeClassName();
    }

    public String getTenant() {
        return this.config.getFunctionDetails().getTenant();
    }

    public String getNamespace() {
        return this.config.getFunctionDetails().getNamespace();
    }

    public String getSinkName() {
        return this.config.getFunctionDetails().getName();
    }

    public String getSourceName() {
        return this.config.getFunctionDetails().getName();
    }

    public String getFunctionName() {
        return this.config.getFunctionDetails().getName();
    }

    public String getFunctionId() {
        return this.config.getFunctionId();
    }

    public int getInstanceId() {
        return this.config.getInstanceId();
    }

    public int getNumInstances() {
        return this.config.getFunctionDetails().getParallelism();
    }

    public String getFunctionVersion() {
        return this.config.getFunctionVersion();
    }

    public Logger getLogger() {
        return this.logger;
    }

    public Optional<Object> getUserConfigValue(String key) {
        Object value = this.userConfigs.getOrDefault(key, null);
        if (value instanceof String && ((String)value).startsWith("$")) {
            try {
                String actualValue = System.getenv(((String)value).substring(1));
                return Optional.ofNullable(actualValue);
            }
            catch (SecurityException ex) {
                throw new RuntimeException("Access to environment variable " + value + " is not allowed.", ex);
            }
        }
        return Optional.ofNullable(value);
    }

    public Object getUserConfigValueOrDefault(String key, Object defaultValue) {
        return this.getUserConfigValue(key).orElse(defaultValue);
    }

    public Map<String, Object> getUserConfigMap() {
        return this.userConfigs;
    }

    public String getSecret(String secretName) {
        if (this.secretsMap.containsKey(secretName)) {
            return this.secretsProvider.provideSecret(secretName, this.secretsMap.get(secretName));
        }
        return null;
    }

    public <S extends StateStore> S getStateStore(String name) {
        return this.getStateStore(this.config.getFunctionDetails().getTenant(), this.config.getFunctionDetails().getNamespace(), name);
    }

    public <S extends StateStore> S getStateStore(String tenant, String ns, String name) {
        return (S)this.stateManager.getStore(tenant, ns, name);
    }

    private void ensureStateEnabled() {
        Preconditions.checkState((null != this.defaultStateStore ? 1 : 0) != 0, (String)"State %s/%s/%s is not enabled.", (Object)this.config.getFunctionDetails().getTenant(), (Object)this.config.getFunctionDetails().getNamespace(), (Object)this.config.getFunctionDetails().getName());
    }

    public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
        this.ensureStateEnabled();
        return this.defaultStateStore.incrCounterAsync(key, amount);
    }

    public void incrCounter(String key, long amount) {
        this.ensureStateEnabled();
        this.defaultStateStore.incrCounter(key, amount);
    }

    public CompletableFuture<Long> getCounterAsync(String key) {
        this.ensureStateEnabled();
        return this.defaultStateStore.getCounterAsync(key);
    }

    public long getCounter(String key) {
        this.ensureStateEnabled();
        return this.defaultStateStore.getCounter(key);
    }

    public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value) {
        this.ensureStateEnabled();
        return this.defaultStateStore.putAsync(key, value);
    }

    public void putState(String key, ByteBuffer value) {
        this.ensureStateEnabled();
        this.defaultStateStore.put(key, value);
    }

    public CompletableFuture<Void> deleteStateAsync(String key) {
        this.ensureStateEnabled();
        return this.defaultStateStore.deleteAsync(key);
    }

    public void deleteState(String key) {
        this.ensureStateEnabled();
        this.defaultStateStore.delete(key);
    }

    public CompletableFuture<ByteBuffer> getStateAsync(String key) {
        this.ensureStateEnabled();
        return this.defaultStateStore.getAsync(key);
    }

    public ByteBuffer getState(String key) {
        this.ensureStateEnabled();
        return this.defaultStateStore.get(key);
    }

    public <O> CompletableFuture<Void> publish(String topicName, O object) {
        return this.publish(topicName, object, "");
    }

    public <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName) {
        return this.publish(topicName, object, this.externalPulsarClusters.get(this.defaultPulsarCluster).getTopicSchema().getSchema(topicName, object, schemaOrSerdeClassName, false));
    }

    public <O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException {
        return this.newOutputMessage(this.defaultPulsarCluster, topicName, schema);
    }

    public <O> TypedMessageBuilder<O> newOutputMessage(String pulsarName, String topicName, Schema<O> schema) throws PulsarClientException {
        MessageBuilderImpl messageBuilder = new MessageBuilderImpl();
        TypedMessageBuilder typedMessageBuilder = this.getProducer(pulsarName, topicName, schema).newMessage();
        messageBuilder.setUnderlyingBuilder(typedMessageBuilder);
        return messageBuilder;
    }

    public <O> ConsumerBuilder<O> newConsumerBuilder(Schema<O> schema) throws PulsarClientException {
        return this.externalPulsarClusters.get(this.defaultPulsarCluster).getClient().newConsumer(schema);
    }

    public <O> CompletableFuture<Void> publish(String topicName, O object, Schema<O> schema) {
        return this.publish(this.defaultPulsarCluster, topicName, object, schema);
    }

    public <O> CompletableFuture<Void> publish(String pulsarName, String topicName, O object, Schema<O> schema) {
        try {
            return this.newOutputMessage(pulsarName, topicName, schema).value(object).sendAsync().thenApply(msgId -> null);
        }
        catch (PulsarClientException e) {
            this.logger.error("Failed to create Producer while doing user publish", (Throwable)e);
            return FutureUtil.failedFuture((Throwable)e);
        }
    }

    public void recordMetric(String metricName, double value) {
        String[] userMetricLabels = this.userMetricsLabels.get(metricName);
        if (userMetricLabels == null) {
            userMetricLabels = Arrays.copyOf(this.metricsLabels, this.metricsLabels.length + 1);
            userMetricLabels[userMetricLabels.length - 1] = metricName;
            ((Summary.Child)this.userMetricsSummary.labels(userMetricLabels)).observe(value);
            this.userMetricsLabels.put(metricName, userMetricLabels);
        } else {
            ((Summary.Child)this.userMetricsSummary.labels(userMetricLabels)).observe(value);
        }
    }

    private <O> Producer<O> getProducer(String pulsarName, String topicName, Schema<O> schema) throws PulsarClientException {
        Producer producer;
        PulsarCluster pulsar = this.externalPulsarClusters.get(pulsarName);
        if (pulsar.getTlPublishProducers() != null) {
            Map<String, Producer<?>> producerMap = pulsar.getTlPublishProducers().get();
            if (producerMap == null) {
                producerMap = new HashMap();
                pulsar.getTlPublishProducers().set(producerMap);
            }
            producer = producerMap.get(topicName);
        } else {
            producer = pulsar.getPublishProducers().get(topicName);
        }
        if (producer == null) {
            Producer newProducer = ((ProducerBuilderImpl)pulsar.getProducerBuilder().clone()).schema(schema).blockIfQueueFull(true).enableBatching(true).batchingMaxPublishDelay(10L, TimeUnit.MILLISECONDS).compressionType(CompressionType.LZ4).hashingScheme(HashingScheme.Murmur3_32Hash).messageRoutingMode(MessageRoutingMode.CustomPartition).messageRouter((MessageRouter)FunctionResultRouter.of()).sendTimeout(0, TimeUnit.SECONDS).topic(topicName).properties(InstanceUtils.getProperties(this.componentType, FunctionCommon.getFullyQualifiedName((String)this.config.getFunctionDetails().getTenant(), (String)this.config.getFunctionDetails().getNamespace(), (String)this.config.getFunctionDetails().getName()), this.config.getInstanceId())).create();
            if (pulsar.getTlPublishProducers() != null) {
                pulsar.getTlPublishProducers().get().put(topicName, newProducer);
            } else {
                Producer existingProducer = pulsar.getPublishProducers().putIfAbsent(topicName, newProducer);
                if (existingProducer != null) {
                    newProducer.close();
                    producer = existingProducer;
                } else {
                    producer = newProducer;
                }
            }
        }
        return producer;
    }

    public Map<String, Double> getAndResetMetrics() {
        Map<String, Double> retval = this.getMetrics();
        this.resetMetrics();
        return retval;
    }

    public void resetMetrics() {
        this.userMetricsSummary.clear();
    }

    public Map<String, Double> getMetrics() {
        HashMap<String, Double> metricsMap = new HashMap<String, Double>();
        for (Map.Entry<String, String[]> userMetricsLabelsEntry : this.userMetricsLabels.entrySet()) {
            String metricName = userMetricsLabelsEntry.getKey();
            String[] labels = userMetricsLabelsEntry.getValue();
            Summary.Child.Value summary = ((Summary.Child)this.userMetricsSummary.labels(labels)).get();
            metricsMap.put(String.format("%s%s_sum", "user_metric_", metricName), summary.sum);
            metricsMap.put(String.format("%s%s_count", "user_metric_", metricName), summary.count);
            for (Map.Entry entry : summary.quantiles.entrySet()) {
                Double quantile = (Double)entry.getKey();
                Double value = (Double)entry.getValue();
                metricsMap.put(String.format("%s%s_%s", "user_metric_", metricName, quantile), value);
            }
        }
        return metricsMap;
    }

    @Override
    public void close() {
        LinkedList<CompletableFuture> futures = new LinkedList<CompletableFuture>();
        for (Map.Entry<String, PulsarCluster> pulsarEntry : this.externalPulsarClusters.entrySet()) {
            PulsarCluster pulsar = pulsarEntry.getValue();
            if (pulsar.getPublishProducers() != null) {
                for (Producer<?> producer : pulsar.getPublishProducers().values()) {
                    futures.add(producer.closeAsync());
                }
            }
            if (pulsar.getTlPublishProducers() == null) continue;
            for (Producer<?> producer : pulsar.getTlPublishProducers().get().values()) {
                futures.add(producer.closeAsync());
            }
        }
        try {
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
        }
        catch (InterruptedException | ExecutionException e) {
            this.logger.warn("Failed to close producers", (Throwable)e);
        }
    }

    static {
        ContextImpl.userMetricsLabelNames[ComponentStatsManager.metricsLabelNames.length] = "metric";
    }

    class MessageBuilderImpl<O>
    implements TypedMessageBuilder<O> {
        private TypedMessageBuilder<O> underlyingBuilder;

        MessageBuilderImpl() {
        }

        public MessageId send() throws PulsarClientException {
            try {
                return this.sendAsync().get();
            }
            catch (Exception e) {
                throw PulsarClientException.unwrap((Throwable)e);
            }
        }

        public CompletableFuture<MessageId> sendAsync() {
            return this.underlyingBuilder.sendAsync().whenComplete((result, cause) -> {
                if (null != cause) {
                    ContextImpl.this.statsManager.incrSysExceptions((Throwable)cause);
                    ContextImpl.this.logger.error("Failed to publish to topic with error", cause);
                }
            });
        }

        public TypedMessageBuilder<O> key(String key) {
            this.underlyingBuilder.key(key);
            return this;
        }

        public TypedMessageBuilder<O> keyBytes(byte[] key) {
            this.underlyingBuilder.keyBytes(key);
            return this;
        }

        public TypedMessageBuilder<O> orderingKey(byte[] orderingKey) {
            this.underlyingBuilder.orderingKey(orderingKey);
            return this;
        }

        public TypedMessageBuilder<O> value(O value) {
            this.underlyingBuilder.value(value);
            return this;
        }

        public TypedMessageBuilder<O> property(String name, String value) {
            this.underlyingBuilder.property(name, value);
            return this;
        }

        public TypedMessageBuilder<O> properties(Map<String, String> properties) {
            this.underlyingBuilder.properties(properties);
            return this;
        }

        public TypedMessageBuilder<O> eventTime(long timestamp) {
            this.underlyingBuilder.eventTime(timestamp);
            return this;
        }

        public TypedMessageBuilder<O> sequenceId(long sequenceId) {
            this.underlyingBuilder.sequenceId(sequenceId);
            return this;
        }

        public TypedMessageBuilder<O> replicationClusters(List<String> clusters) {
            this.underlyingBuilder.replicationClusters(clusters);
            return this;
        }

        public TypedMessageBuilder<O> disableReplication() {
            this.underlyingBuilder.disableReplication();
            return this;
        }

        public TypedMessageBuilder<O> loadConf(Map<String, Object> config) {
            this.underlyingBuilder.loadConf(config);
            return this;
        }

        public TypedMessageBuilder<O> deliverAfter(long delay, TimeUnit unit) {
            this.underlyingBuilder.deliverAfter(delay, unit);
            return this;
        }

        public TypedMessageBuilder<O> deliverAt(long timestamp) {
            this.underlyingBuilder.deliverAt(timestamp);
            return this;
        }

        public void setUnderlyingBuilder(TypedMessageBuilder<O> underlyingBuilder) {
            this.underlyingBuilder = underlyingBuilder;
        }
    }
}

