package org.apache.pulsar.functions.instance;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.gson.Gson;
import org.apache.pulsar.shade.com.google.gson.reflect.TypeToken;
import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
import org.apache.pulsar.shade.org.rocksdb.HashSkipListMemTableConfig;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/pulsar/functions/instance/ContextImpl.class */
public class ContextImpl implements Context, SinkContext, SourceContext {
    private InstanceConfig config;
    private Logger logger;
    private MessageId messageId;
    private String currentTopicName;
    private ConcurrentMap<String, AccumulatedMetricDatum> currentAccumulatedMetrics = new ConcurrentHashMap();
    private ConcurrentMap<String, AccumulatedMetricDatum> accumulatedMetrics = new ConcurrentHashMap();
    private Map<String, Producer> publishProducers = new HashMap();
    private Map<String, SerDe> publishSerializers = new HashMap();
    private ProducerConfiguration producerConfiguration = new ProducerConfiguration();
    private PulsarClient pulsarClient;
    private ClassLoader classLoader;
    Consumer inputConsumer;
    private StateContextImpl stateContext;
    private Map<String, Object> userConfigs;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/functions/instance/ContextImpl$AccumulatedMetricDatum.class */
    public class AccumulatedMetricDatum {
        private double count = 0.0d;
        private double sum = 0.0d;
        private double max = Double.MIN_VALUE;
        private double min = Double.MAX_VALUE;

        AccumulatedMetricDatum() {
        }

        public void update(double d) {
            this.count += 1.0d;
            this.sum += d;
            if (this.max < d) {
                this.max = d;
            }
            if (this.min > d) {
                this.min = d;
            }
        }

        public double getCount() {
            return this.count;
        }

        public double getSum() {
            return this.sum;
        }

        public double getMax() {
            return this.max;
        }

        public double getMin() {
            return this.min;
        }

        public void setCount(double d) {
            this.count = d;
        }

        public void setSum(double d) {
            this.sum = d;
        }

        public void setMax(double d) {
            this.max = d;
        }

        public void setMin(double d) {
            this.min = d;
        }
    }

    public ContextImpl(InstanceConfig instanceConfig, Logger logger, PulsarClient pulsarClient, ClassLoader classLoader, Consumer consumer) {
        this.config = instanceConfig;
        this.logger = logger;
        this.pulsarClient = pulsarClient;
        this.classLoader = classLoader;
        this.inputConsumer = consumer;
        this.producerConfiguration.setBlockIfQueueFull(true);
        this.producerConfiguration.setBatchingEnabled(true);
        this.producerConfiguration.setBatchingMaxPublishDelay(1L, TimeUnit.MILLISECONDS);
        this.producerConfiguration.setMaxPendingMessages(HashSkipListMemTableConfig.DEFAULT_BUCKET_COUNT);
        if (instanceConfig.getFunctionDetails().getUserConfig().isEmpty()) {
            this.userConfigs = new HashMap();
        } else {
            this.userConfigs = (Map) new Gson().fromJson(instanceConfig.getFunctionDetails().getUserConfig(), new TypeToken<Map<String, Object>>() { // from class: org.apache.pulsar.functions.instance.ContextImpl.1
            }.getType());
        }
    }

    public void setCurrentMessageContext(MessageId messageId, String str) {
        this.messageId = messageId;
        this.currentTopicName = str;
    }

    @Override // org.apache.pulsar.functions.api.Context
    public byte[] getMessageId() {
        return this.messageId.toByteArray();
    }

    @Override // org.apache.pulsar.functions.api.Context
    public String getCurrentMessageTopicName() {
        return this.currentTopicName;
    }

    @Override // org.apache.pulsar.functions.api.Context
    public Collection<String> getInputTopics() {
        return this.inputConsumer == null ? new LinkedList() : this.inputConsumer instanceof MultiTopicsConsumerImpl ? ((MultiTopicsConsumerImpl) this.inputConsumer).getTopics() : Arrays.asList(this.inputConsumer.getTopic());
    }

    @Override // org.apache.pulsar.functions.api.Context
    public String getOutputTopic() {
        return this.config.getFunctionDetails().getSink().getTopic();
    }

    @Override // org.apache.pulsar.functions.api.Context
    public String getOutputSerdeClassName() {
        return this.config.getFunctionDetails().getSink().getSerDeClassName();
    }

    @Override // org.apache.pulsar.functions.api.Context
    public String getTenant() {
        return this.config.getFunctionDetails().getTenant();
    }

    @Override // org.apache.pulsar.functions.api.Context
    public String getNamespace() {
        return this.config.getFunctionDetails().getNamespace();
    }

    @Override // org.apache.pulsar.functions.api.Context
    public String getFunctionName() {
        return this.config.getFunctionDetails().getName();
    }

    @Override // org.apache.pulsar.functions.api.Context
    public String getFunctionId() {
        return this.config.getFunctionId().toString();
    }

    @Override // org.apache.pulsar.functions.api.Context
    public String getInstanceId() {
        return this.config.getInstanceId().toString();
    }

    @Override // org.apache.pulsar.functions.api.Context
    public String getFunctionVersion() {
        return this.config.getFunctionVersion();
    }

    @Override // org.apache.pulsar.functions.api.Context
    public Logger getLogger() {
        return this.logger;
    }

    @Override // org.apache.pulsar.functions.api.Context
    public Optional<Object> getUserConfigValue(String str) {
        return Optional.ofNullable(this.userConfigs.getOrDefault(str, null));
    }

    @Override // org.apache.pulsar.functions.api.Context
    public Object getUserConfigValueOrDefault(String str, Object obj) {
        return getUserConfigValue(str).orElse(obj);
    }

    @Override // org.apache.pulsar.functions.api.Context
    public Map<String, Object> getUserConfigMap() {
        return this.userConfigs;
    }

    private void ensureStateEnabled() {
        Preconditions.checkState(null != this.stateContext, "State is not enabled.");
    }

    @Override // org.apache.pulsar.functions.api.Context
    public void incrCounter(String str, long j) {
        ensureStateEnabled();
        try {
            this.stateContext.incr(str, j);
        } catch (Exception e) {
            throw new RuntimeException("Failed to increment key '" + str + "' by amount '" + j + "'", e);
        }
    }

    @Override // org.apache.pulsar.functions.api.Context
    public long getCounter(String str) {
        ensureStateEnabled();
        try {
            return this.stateContext.getAmount(str);
        } catch (Exception e) {
            throw new RuntimeException("Failed to retrieve counter from key '" + str + "'");
        }
    }

    @Override // org.apache.pulsar.functions.api.Context
    public void putState(String str, ByteBuffer byteBuffer) {
        ensureStateEnabled();
        try {
            this.stateContext.put(str, byteBuffer);
        } catch (Exception e) {
            throw new RuntimeException("Failed to update the state value for key '" + str + "'");
        }
    }

    @Override // org.apache.pulsar.functions.api.Context
    public ByteBuffer getState(String str) {
        ensureStateEnabled();
        try {
            return this.stateContext.getValue(str);
        } catch (Exception e) {
            throw new RuntimeException("Failed to retrieve the state value for key '" + str + "'");
        }
    }

    @Override // org.apache.pulsar.functions.api.Context
    public <O> CompletableFuture<Void> publish(String str, O o) {
        return publish(str, o, DefaultSerDe.class.getName());
    }

    @Override // org.apache.pulsar.functions.api.Context
    public <O> CompletableFuture<Void> publish(String str, O o, String str2) {
        SerDe serDe;
        if (!this.publishProducers.containsKey(str)) {
            try {
                this.publishProducers.put(str, this.pulsarClient.createProducer(str, this.producerConfiguration));
            } catch (PulsarClientException e) {
                CompletableFuture<Void> completableFuture = new CompletableFuture<>();
                completableFuture.completeExceptionally(e);
                return completableFuture;
            }
        }
        if (StringUtils.isEmpty(str2)) {
            str2 = DefaultSerDe.class.getName();
        }
        if (!this.publishSerializers.containsKey(str2)) {
            if (!str2.equals(DefaultSerDe.class.getName())) {
                try {
                    serDe = (SerDe) Reflections.createInstance(str2, Class.forName(str2), this.classLoader);
                } catch (ClassNotFoundException e2) {
                    throw new RuntimeException(e2);
                }
            } else {
                if (!DefaultSerDe.IsSupportedType(o.getClass())) {
                    throw new RuntimeException("Default Serializer does not support " + o.getClass());
                }
                serDe = new DefaultSerDe(o.getClass());
            }
            this.publishSerializers.put(str2, serDe);
        }
        return this.publishProducers.get(str).sendAsync((Producer) this.publishSerializers.get(str2).serialize(o)).thenApply(obj -> {
            return null;
        });
    }

    @Override // org.apache.pulsar.functions.api.Context
    public CompletableFuture<Void> ack(byte[] bArr) {
        if (this.inputConsumer == null) {
            return CompletableFuture.completedFuture(null);
        }
        try {
            return this.inputConsumer.acknowledgeAsync(MessageId.fromByteArray(bArr));
        } catch (IOException e) {
            throw new RuntimeException("Invalid message id to ack", e);
        }
    }

    @Override // org.apache.pulsar.functions.api.Context, org.apache.pulsar.io.core.SinkContext, org.apache.pulsar.io.core.SourceContext
    public void recordMetric(String str, double d) {
        this.currentAccumulatedMetrics.putIfAbsent(str, new AccumulatedMetricDatum());
        this.currentAccumulatedMetrics.get(str).update(d);
    }

    public InstanceCommunication.MetricsData getAndResetMetrics() {
        InstanceCommunication.MetricsData metrics = getMetrics();
        resetMetrics();
        return metrics;
    }

    public void resetMetrics() {
        this.accumulatedMetrics.clear();
        this.accumulatedMetrics.putAll(this.currentAccumulatedMetrics);
        this.currentAccumulatedMetrics.clear();
    }

    public InstanceCommunication.MetricsData getMetrics() {
        InstanceCommunication.MetricsData.Builder newBuilder = InstanceCommunication.MetricsData.newBuilder();
        for (String str : this.accumulatedMetrics.keySet()) {
            InstanceCommunication.MetricsData.DataDigest.Builder newBuilder2 = InstanceCommunication.MetricsData.DataDigest.newBuilder();
            newBuilder2.setSum(this.accumulatedMetrics.get(str).getSum());
            newBuilder2.setCount(this.accumulatedMetrics.get(str).getCount());
            newBuilder2.setMax(this.accumulatedMetrics.get(str).getMax());
            newBuilder2.setMin(this.accumulatedMetrics.get(str).getMax());
            newBuilder.putMetrics(str, newBuilder2.build());
        }
        return newBuilder.build();
    }

    public StateContextImpl getStateContext() {
        return this.stateContext;
    }

    public void setStateContext(StateContextImpl stateContextImpl) {
        this.stateContext = stateContextImpl;
    }
}
