/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.instance;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.prometheus.client.Summary;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.StateStore;
import org.apache.pulsar.functions.api.utils.FunctionRecord;
import org.apache.pulsar.functions.instance.FunctionResultRouter;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.instance.state.DefaultStateStore;
import org.apache.pulsar.functions.instance.state.StateManager;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.source.PulsarFunctionRecord;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;

class ContextImpl
implements Context,
SinkContext,
SourceContext,
AutoCloseable {
    private InstanceConfig config;
    private Logger logger;
    private Record<?> record;
    private final ClientBuilder clientBuilder;
    private final PulsarClient client;
    private final PulsarAdmin pulsarAdmin;
    private Map<String, Producer<?>> publishProducers;
    private ThreadLocal<Map<String, Producer<?>>> tlPublishProducers;
    private ProducerBuilderImpl<?> producerBuilder;
    private final TopicSchema topicSchema;
    private final SecretsProvider secretsProvider;
    private final Map<String, Object> secretsMap;
    @VisibleForTesting
    StateManager stateManager;
    @VisibleForTesting
    DefaultStateStore defaultStateStore;
    private Map<String, Object> userConfigs;
    private ComponentStatsManager statsManager;
    Map<String, String[]> userMetricsLabels = new HashMap<String, String[]>();
    private final String[] metricsLabels;
    private final Summary userMetricsSummary;
    private final SubscriptionType subscriptionType;
    private static final String[] userMetricsLabelNames = Arrays.copyOf(ComponentStatsManager.METRICS_LABEL_NAMES, ComponentStatsManager.METRICS_LABEL_NAMES.length + 1);
    private boolean exposePulsarAdminClientEnabled;
    private List<Consumer<?>> inputConsumers;
    private final Map<TopicName, Consumer> topicConsumers = new ConcurrentHashMap<TopicName, Consumer>();
    private final Function.FunctionDetails.ComponentType componentType;

    public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry, String[] metricsLabels, Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager, StateManager stateManager, PulsarAdmin pulsarAdmin, ClientBuilder clientBuilder) throws PulsarClientException {
        this.config = config;
        this.logger = logger;
        this.clientBuilder = clientBuilder;
        this.client = client;
        this.pulsarAdmin = pulsarAdmin;
        this.topicSchema = new TopicSchema(client, Thread.currentThread().getContextClassLoader());
        this.statsManager = statsManager;
        this.producerBuilder = (ProducerBuilderImpl)client.newProducer().blockIfQueueFull(true).enableBatching(true).batchingMaxPublishDelay(1L, TimeUnit.MILLISECONDS);
        boolean useThreadLocalProducers = false;
        Function.ProducerSpec producerSpec = config.getFunctionDetails().getSink().getProducerSpec();
        if (producerSpec != null) {
            if (producerSpec.getMaxPendingMessages() != 0) {
                this.producerBuilder.maxPendingMessages(producerSpec.getMaxPendingMessages());
            }
            if (producerSpec.getMaxPendingMessagesAcrossPartitions() != 0) {
                this.producerBuilder.maxPendingMessagesAcrossPartitions(producerSpec.getMaxPendingMessagesAcrossPartitions());
            }
            if (producerSpec.getBatchBuilder() != null) {
                if (producerSpec.getBatchBuilder().equals("KEY_BASED")) {
                    this.producerBuilder.batcherBuilder(BatcherBuilder.KEY_BASED);
                } else {
                    this.producerBuilder.batcherBuilder(BatcherBuilder.DEFAULT);
                }
            }
            useThreadLocalProducers = producerSpec.getUseThreadLocalProducers();
        }
        if (useThreadLocalProducers) {
            this.tlPublishProducers = new ThreadLocal();
        } else {
            this.publishProducers = new ConcurrentHashMap();
        }
        this.userConfigs = config.getFunctionDetails().getUserConfig().isEmpty() ? new HashMap<String, Object>() : (Map)new Gson().fromJson(config.getFunctionDetails().getUserConfig(), new TypeToken<Map<String, Object>>(){}.getType());
        this.secretsProvider = secretsProvider;
        this.secretsMap = !StringUtils.isEmpty((CharSequence)config.getFunctionDetails().getSecretsMap()) ? (Map<Object, Object>)new Gson().fromJson(config.getFunctionDetails().getSecretsMap(), new TypeToken<Map<String, Object>>(){}.getType()) : new HashMap<String, Object>();
        this.metricsLabels = metricsLabels;
        String prefix = switch (componentType) {
            case Function.FunctionDetails.ComponentType.FUNCTION -> "pulsar_function_";
            case Function.FunctionDetails.ComponentType.SINK -> "pulsar_sink_";
            case Function.FunctionDetails.ComponentType.SOURCE -> "pulsar_source_";
            default -> throw new RuntimeException("Unknown component type: " + componentType);
        };
        this.userMetricsSummary = collectorRegistry.registerIfNotExist(prefix + "user_metric_", ((Summary.Builder)((Summary.Builder)((Summary.Builder)Summary.build().name(prefix + "user_metric_")).help("User defined metric.")).labelNames(userMetricsLabelNames)).quantile(0.5, 0.01).quantile(0.9, 0.01).quantile(0.99, 0.01).quantile(0.999, 0.01).create());
        this.componentType = componentType;
        this.stateManager = stateManager;
        this.defaultStateStore = (DefaultStateStore)stateManager.getStore(config.getFunctionDetails().getTenant(), config.getFunctionDetails().getNamespace(), config.getFunctionDetails().getName());
        this.exposePulsarAdminClientEnabled = config.isExposePulsarAdminClientEnabled();
        Function.SourceSpec sourceSpec = config.getFunctionDetails().getSource();
        switch (sourceSpec.getSubscriptionType()) {
            case FAILOVER: {
                this.subscriptionType = SubscriptionType.Failover;
                break;
            }
            case KEY_SHARED: {
                this.subscriptionType = SubscriptionType.Key_Shared;
                break;
            }
            default: {
                this.subscriptionType = SubscriptionType.Shared;
            }
        }
    }

    public void setCurrentMessageContext(Record<?> record) {
        this.record = record;
    }

    public Record<?> getCurrentRecord() {
        return new PulsarFunctionRecord(this.record, this.config.getFunctionDetails());
    }

    public Collection<String> getInputTopics() {
        return this.config.getFunctionDetails().getSource().getInputSpecsMap().keySet();
    }

    public SinkConfig getSinkConfig() {
        return SinkConfigUtils.convertFromDetails((Function.FunctionDetails)this.config.getFunctionDetails());
    }

    public String getOutputTopic() {
        return this.config.getFunctionDetails().getSink().getTopic();
    }

    public SourceConfig getSourceConfig() {
        return SourceConfigUtils.convertFromDetails((Function.FunctionDetails)this.config.getFunctionDetails());
    }

    public String getOutputSchemaType() {
        Function.SinkSpec sink = this.config.getFunctionDetails().getSink();
        if (!StringUtils.isEmpty((CharSequence)sink.getSchemaType())) {
            return sink.getSchemaType();
        }
        return sink.getSerDeClassName();
    }

    public String getTenant() {
        return this.config.getFunctionDetails().getTenant();
    }

    public String getNamespace() {
        return this.config.getFunctionDetails().getNamespace();
    }

    public String getSinkName() {
        return this.config.getFunctionDetails().getName();
    }

    public String getSourceName() {
        return this.config.getFunctionDetails().getName();
    }

    public String getFunctionName() {
        return this.config.getFunctionDetails().getName();
    }

    public String getFunctionId() {
        return this.config.getFunctionId();
    }

    public int getInstanceId() {
        return this.config.getInstanceId();
    }

    public int getNumInstances() {
        return this.config.getFunctionDetails().getParallelism();
    }

    public String getFunctionVersion() {
        return this.config.getFunctionVersion();
    }

    public Logger getLogger() {
        return this.logger;
    }

    public Optional<Object> getUserConfigValue(String key) {
        Object value = this.userConfigs.getOrDefault(key, null);
        if (value instanceof String && ((String)value).startsWith("$")) {
            try {
                String actualValue = System.getenv(((String)value).substring(1));
                return Optional.ofNullable(actualValue);
            }
            catch (SecurityException ex) {
                throw new RuntimeException("Access to environment variable " + value + " is not allowed.", ex);
            }
        }
        return Optional.ofNullable(value);
    }

    public Object getUserConfigValueOrDefault(String key, Object defaultValue) {
        return this.getUserConfigValue(key).orElse(defaultValue);
    }

    public Map<String, Object> getUserConfigMap() {
        return this.userConfigs;
    }

    public String getSecret(String secretName) {
        if (this.secretsMap.containsKey(secretName)) {
            return this.secretsProvider.provideSecret(secretName, this.secretsMap.get(secretName));
        }
        return null;
    }

    public PulsarAdmin getPulsarAdmin() {
        if (this.exposePulsarAdminClientEnabled) {
            return this.pulsarAdmin;
        }
        throw new IllegalStateException("PulsarAdmin is not enabled in function worker");
    }

    public <T extends StateStore> T getStateStore(String name) {
        return this.getStateStore(this.config.getFunctionDetails().getTenant(), this.config.getFunctionDetails().getNamespace(), name);
    }

    public <T extends StateStore> T getStateStore(String tenant, String ns, String name) {
        return (T)this.stateManager.getStore(tenant, ns, name);
    }

    private void ensureStateEnabled() {
        Preconditions.checkState((null != this.defaultStateStore ? 1 : 0) != 0, (String)"State %s/%s/%s is not enabled.", (Object)this.config.getFunctionDetails().getTenant(), (Object)this.config.getFunctionDetails().getNamespace(), (Object)this.config.getFunctionDetails().getName());
    }

    public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
        this.ensureStateEnabled();
        return this.defaultStateStore.incrCounterAsync(key, amount);
    }

    public void incrCounter(String key, long amount) {
        this.ensureStateEnabled();
        this.defaultStateStore.incrCounter(key, amount);
    }

    public CompletableFuture<Long> getCounterAsync(String key) {
        this.ensureStateEnabled();
        return this.defaultStateStore.getCounterAsync(key);
    }

    public long getCounter(String key) {
        this.ensureStateEnabled();
        return this.defaultStateStore.getCounter(key);
    }

    public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value) {
        this.ensureStateEnabled();
        return this.defaultStateStore.putAsync(key, value);
    }

    public void putState(String key, ByteBuffer value) {
        this.ensureStateEnabled();
        this.defaultStateStore.put(key, value);
    }

    public CompletableFuture<Void> deleteStateAsync(String key) {
        this.ensureStateEnabled();
        return this.defaultStateStore.deleteAsync(key);
    }

    public void deleteState(String key) {
        this.ensureStateEnabled();
        this.defaultStateStore.delete(key);
    }

    public CompletableFuture<ByteBuffer> getStateAsync(String key) {
        this.ensureStateEnabled();
        return this.defaultStateStore.getAsync(key);
    }

    public ByteBuffer getState(String key) {
        this.ensureStateEnabled();
        return this.defaultStateStore.get(key);
    }

    public <T> CompletableFuture<Void> publish(String topicName, T object) {
        return this.publish(topicName, object, "");
    }

    public <T> CompletableFuture<Void> publish(String topicName, T object, String schemaOrSerdeClassName) {
        return this.publish(topicName, object, this.topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false));
    }

    public <T> TypedMessageBuilder<T> newOutputMessage(String topicName, Schema<T> schema) throws PulsarClientException {
        MessageBuilderImpl messageBuilder = new MessageBuilderImpl();
        Producer<T> producer = this.getProducer(topicName, schema);
        TypedMessageBuilder typedMessageBuilder = schema != null ? producer.newMessage(schema) : producer.newMessage();
        messageBuilder.setUnderlyingBuilder(typedMessageBuilder);
        return messageBuilder;
    }

    public <T> ConsumerBuilder<T> newConsumerBuilder(Schema<T> schema) throws PulsarClientException {
        return this.client.newConsumer(schema);
    }

    public <X> FunctionRecord.FunctionRecordBuilder<X> newOutputRecordBuilder(Schema<X> schema) {
        return FunctionRecord.from((Context)this, schema);
    }

    public SubscriptionType getSubscriptionType() {
        return this.subscriptionType;
    }

    public <T> CompletableFuture<Void> publish(String topicName, T object, Schema<T> schema) {
        try {
            return this.newOutputMessage(topicName, schema).value(object).sendAsync().thenApply(msgId -> null);
        }
        catch (PulsarClientException e) {
            this.logger.error("Failed to create Producer while doing user publish", (Throwable)e);
            return FutureUtil.failedFuture((Throwable)e);
        }
    }

    public void recordMetric(String metricName, double value) {
        String[] userMetricLabels = this.userMetricsLabels.get(metricName);
        if (userMetricLabels == null) {
            userMetricLabels = Arrays.copyOf(this.metricsLabels, this.metricsLabels.length + 1);
            userMetricLabels[userMetricLabels.length - 1] = metricName;
            ((Summary.Child)this.userMetricsSummary.labels(userMetricLabels)).observe(value);
            this.userMetricsLabels.put(metricName, userMetricLabels);
        } else {
            ((Summary.Child)this.userMetricsSummary.labels(userMetricLabels)).observe(value);
        }
    }

    public PulsarClient getPulsarClient() {
        return this.client;
    }

    public ClientBuilder getPulsarClientBuilder() {
        return this.clientBuilder;
    }

    private <T> Producer<T> getProducer(String topicName, Schema<T> schema) throws PulsarClientException {
        Producer producer;
        if (this.tlPublishProducers != null) {
            Map<String, Producer<?>> producerMap = this.tlPublishProducers.get();
            if (producerMap == null) {
                producerMap = new HashMap();
                this.tlPublishProducers.set(producerMap);
            }
            producer = producerMap.get(topicName);
        } else {
            producer = this.publishProducers.get(topicName);
        }
        if (producer == null) {
            Producer newProducer = ((ProducerBuilderImpl)this.producerBuilder.clone()).schema(schema).blockIfQueueFull(true).enableBatching(true).batchingMaxPublishDelay(10L, TimeUnit.MILLISECONDS).compressionType(CompressionType.LZ4).hashingScheme(HashingScheme.Murmur3_32Hash).messageRoutingMode(MessageRoutingMode.CustomPartition).messageRouter((MessageRouter)FunctionResultRouter.of()).sendTimeout(0, TimeUnit.SECONDS).topic(topicName).properties(InstanceUtils.getProperties(this.componentType, FunctionCommon.getFullyQualifiedName((String)this.config.getFunctionDetails().getTenant(), (String)this.config.getFunctionDetails().getNamespace(), (String)this.config.getFunctionDetails().getName()), this.config.getInstanceId())).create();
            if (this.tlPublishProducers != null) {
                this.tlPublishProducers.get().put(topicName, newProducer);
            } else {
                Producer existingProducer = this.publishProducers.putIfAbsent(topicName, newProducer);
                if (existingProducer != null) {
                    newProducer.close();
                    producer = existingProducer;
                } else {
                    producer = newProducer;
                }
            }
        }
        return producer;
    }

    public Map<String, Double> getAndResetMetrics() {
        Map<String, Double> retval = this.getMetrics();
        this.resetMetrics();
        return retval;
    }

    public void resetMetrics() {
        this.userMetricsSummary.clear();
    }

    public Map<String, Double> getMetrics() {
        HashMap<String, Double> metricsMap = new HashMap<String, Double>();
        for (Map.Entry<String, String[]> userMetricsLabelsEntry : this.userMetricsLabels.entrySet()) {
            String metricName = userMetricsLabelsEntry.getKey();
            String[] labels = userMetricsLabelsEntry.getValue();
            Summary.Child.Value summary = ((Summary.Child)this.userMetricsSummary.labels(labels)).get();
            metricsMap.put(String.format("%s%s_sum", "user_metric_", metricName), summary.sum);
            metricsMap.put(String.format("%s%s_count", "user_metric_", metricName), summary.count);
            for (Map.Entry entry : summary.quantiles.entrySet()) {
                Double quantile = (Double)entry.getKey();
                Double value = (Double)entry.getValue();
                metricsMap.put(String.format("%s%s_%s", "user_metric_", metricName, quantile), value);
            }
        }
        return metricsMap;
    }

    @Override
    public void close() {
        LinkedList<CompletableFuture> futures = new LinkedList<CompletableFuture>();
        if (this.publishProducers != null) {
            for (Producer<?> producer : this.publishProducers.values()) {
                futures.add(producer.closeAsync());
            }
        }
        if (this.tlPublishProducers != null) {
            for (Producer<?> producer : this.tlPublishProducers.get().values()) {
                futures.add(producer.closeAsync());
            }
        }
        if (this.pulsarAdmin != null) {
            this.pulsarAdmin.close();
        }
        try {
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
        }
        catch (InterruptedException | ExecutionException e) {
            this.logger.warn("Failed to close producers", (Throwable)e);
        }
    }

    public void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
        Consumer<?> consumer = this.getConsumer(topic, partition);
        consumer.seek(messageId);
    }

    public void pause(String topic, int partition) throws PulsarClientException {
        this.getConsumer(topic, partition).pause();
    }

    public void resume(String topic, int partition) throws PulsarClientException {
        this.getConsumer(topic, partition).resume();
    }

    public void setInputConsumers(List<Consumer<?>> inputConsumers) {
        this.inputConsumers = inputConsumers;
        inputConsumers.stream().flatMap(consumer -> consumer instanceof MultiTopicsConsumerImpl ? ((MultiTopicsConsumerImpl)consumer).getConsumers().stream() : Stream.of(consumer)).forEach(consumer -> this.topicConsumers.putIfAbsent(TopicName.get((String)consumer.getTopic()), (Consumer)consumer));
    }

    private void reloadConsumersFromMultiTopicsConsumers() {
        this.inputConsumers.stream().flatMap(c -> c instanceof MultiTopicsConsumerImpl ? ((MultiTopicsConsumerImpl)c).getConsumers().stream() : Stream.empty()).forEach(c -> this.topicConsumers.putIfAbsent(TopicName.get((String)c.getTopic()), (Consumer)c));
    }

    private Consumer<?> tryGetConsumer(String topic, int partition) {
        Consumer consumer;
        if (partition == 0 && (consumer = this.topicConsumers.get(TopicName.get((String)topic))) != null) {
            return consumer;
        }
        return this.topicConsumers.get(TopicName.get((String)topic).getPartition(partition));
    }

    @VisibleForTesting
    Consumer<?> getConsumer(String topic, int partition) throws PulsarClientException {
        if (this.inputConsumers == null) {
            throw new PulsarClientException("Getting consumer is not supported");
        }
        Consumer<?> consumer = this.tryGetConsumer(topic, partition);
        if (consumer == null) {
            this.reloadConsumersFromMultiTopicsConsumers();
            consumer = this.tryGetConsumer(topic, partition);
        }
        if (consumer != null) {
            return consumer;
        }
        throw new PulsarClientException("Consumer for topic " + topic + " partition " + partition + " is not found");
    }

    public String toString() {
        return "ContextImpl(config=" + this.config + ", logger=" + this.getLogger() + ", record=" + this.record + ", clientBuilder=" + this.clientBuilder + ", client=" + this.client + ", publishProducers=" + this.publishProducers + ", tlPublishProducers=" + this.tlPublishProducers + ", producerBuilder=" + this.producerBuilder + ", topicSchema=" + this.topicSchema + ", secretsProvider=" + this.secretsProvider + ", secretsMap=" + this.secretsMap + ", stateManager=" + this.stateManager + ", defaultStateStore=" + this.defaultStateStore + ", userConfigs=" + this.userConfigs + ", statsManager=" + this.statsManager + ", userMetricsLabels=" + this.userMetricsLabels + ", metricsLabels=" + Arrays.deepToString(this.metricsLabels) + ", userMetricsSummary=" + this.userMetricsSummary + ", subscriptionType=" + this.getSubscriptionType() + ", exposePulsarAdminClientEnabled=" + this.exposePulsarAdminClientEnabled + ", inputConsumers=" + this.inputConsumers + ", topicConsumers=" + this.topicConsumers + ", componentType=" + this.componentType + ")";
    }

    static {
        ContextImpl.userMetricsLabelNames[ComponentStatsManager.METRICS_LABEL_NAMES.length] = "metric";
    }

    class MessageBuilderImpl<T>
    implements TypedMessageBuilder<T> {
        private TypedMessageBuilder<T> underlyingBuilder;

        MessageBuilderImpl() {
        }

        public MessageId send() throws PulsarClientException {
            try {
                return this.sendAsync().get();
            }
            catch (Exception e) {
                throw PulsarClientException.unwrap((Throwable)e);
            }
        }

        public CompletableFuture<MessageId> sendAsync() {
            return this.underlyingBuilder.sendAsync().whenComplete((result, cause) -> {
                if (null != cause) {
                    ContextImpl.this.statsManager.incrSysExceptions((Throwable)cause);
                    ContextImpl.this.logger.error("Failed to publish to topic with error", cause);
                }
            });
        }

        public TypedMessageBuilder<T> key(String key) {
            this.underlyingBuilder.key(key);
            return this;
        }

        public TypedMessageBuilder<T> keyBytes(byte[] key) {
            this.underlyingBuilder.keyBytes(key);
            return this;
        }

        public TypedMessageBuilder<T> orderingKey(byte[] orderingKey) {
            this.underlyingBuilder.orderingKey(orderingKey);
            return this;
        }

        public TypedMessageBuilder<T> value(T value) {
            this.underlyingBuilder.value(value);
            return this;
        }

        public TypedMessageBuilder<T> property(String name, String value) {
            this.underlyingBuilder.property(name, value);
            return this;
        }

        public TypedMessageBuilder<T> properties(Map<String, String> properties) {
            this.underlyingBuilder.properties(properties);
            return this;
        }

        public TypedMessageBuilder<T> eventTime(long timestamp) {
            this.underlyingBuilder.eventTime(timestamp);
            return this;
        }

        public TypedMessageBuilder<T> sequenceId(long sequenceId) {
            this.underlyingBuilder.sequenceId(sequenceId);
            return this;
        }

        public TypedMessageBuilder<T> replicationClusters(List<String> clusters) {
            this.underlyingBuilder.replicationClusters(clusters);
            return this;
        }

        public TypedMessageBuilder<T> disableReplication() {
            this.underlyingBuilder.disableReplication();
            return this;
        }

        public TypedMessageBuilder<T> loadConf(Map<String, Object> config) {
            this.underlyingBuilder.loadConf(config);
            return this;
        }

        public TypedMessageBuilder<T> deliverAfter(long delay, TimeUnit unit) {
            this.underlyingBuilder.deliverAfter(delay, unit);
            return this;
        }

        public TypedMessageBuilder<T> deliverAt(long timestamp) {
            this.underlyingBuilder.deliverAt(timestamp);
            return this;
        }

        public void setUnderlyingBuilder(TypedMessageBuilder<T> underlyingBuilder) {
            this.underlyingBuilder = underlyingBuilder;
        }
    }
}

