package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/ActiveTaskCreator.class */
public class ActiveTaskCreator {
    private final TopologyMetadata topologyMetadata;
    private final StreamsConfig applicationConfig;
    private final StreamsMetricsImpl streamsMetrics;
    private final StateDirectory stateDirectory;
    private final ChangelogReader storeChangelogReader;
    private final ThreadCache cache;
    private final Time time;
    private final KafkaClientSupplier clientSupplier;
    private final String threadId;
    private final Logger log;
    private final Sensor createTaskSensor;
    private final StreamsProducer threadProducer;
    private final Map<TaskId, StreamsProducer> taskProducers;
    private final StreamsConfigUtils.ProcessingMode processingMode;
    private final boolean stateUpdaterEnabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ActiveTaskCreator(TopologyMetadata topologyMetadata, StreamsConfig streamsConfig, StreamsMetricsImpl streamsMetricsImpl, StateDirectory stateDirectory, ChangelogReader changelogReader, ThreadCache threadCache, Time time, KafkaClientSupplier kafkaClientSupplier, String str, UUID uuid, Logger logger, boolean z) {
        this.topologyMetadata = topologyMetadata;
        this.applicationConfig = streamsConfig;
        this.streamsMetrics = streamsMetricsImpl;
        this.stateDirectory = stateDirectory;
        this.storeChangelogReader = changelogReader;
        this.cache = threadCache;
        this.time = time;
        this.clientSupplier = kafkaClientSupplier;
        this.threadId = str;
        this.log = logger;
        this.stateUpdaterEnabled = z;
        this.createTaskSensor = ThreadMetrics.createTaskSensor(str, streamsMetricsImpl);
        this.processingMode = StreamsConfigUtils.processingMode(streamsConfig);
        if (this.processingMode == StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA) {
            this.threadProducer = null;
            this.taskProducers = new HashMap();
        } else {
            logger.info("Creating thread producer client");
            this.threadProducer = new StreamsProducer(streamsConfig, str, kafkaClientSupplier, null, uuid, new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())), time);
            this.taskProducers = Collections.emptyMap();
        }
    }

    public void reInitializeThreadProducer() {
        this.threadProducer.resetProducer();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamsProducer streamsProducerForTask(TaskId taskId) {
        if (this.processingMode != StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA) {
            throw new IllegalStateException("Expected EXACTLY_ONCE to be enabled, but the processing mode was " + this.processingMode);
        }
        StreamsProducer streamsProducer = this.taskProducers.get(taskId);
        if (streamsProducer == null) {
            throw new IllegalStateException("Unknown TaskId: " + taskId);
        }
        return streamsProducer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamsProducer threadProducer() {
        if (this.processingMode != StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2) {
            throw new IllegalStateException("Expected EXACTLY_ONCE_V2 to be enabled, but the processing mode was " + this.processingMode);
        }
        return this.threadProducer;
    }

    public Collection<Task> createTasks(Consumer<byte[], byte[]> consumer, Map<TaskId, Set<TopicPartition>> map) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<TaskId, Set<TopicPartition>> entry : map.entrySet()) {
            TaskId key = entry.getKey();
            LogContext logContext = getLogContext(key);
            Set<TopicPartition> value = entry.getValue();
            ProcessorTopology buildSubtopology = this.topologyMetadata.buildSubtopology(key);
            ProcessorStateManager processorStateManager = new ProcessorStateManager(key, Task.TaskType.ACTIVE, StreamsConfigUtils.eosEnabled(this.applicationConfig), logContext, this.stateDirectory, this.storeChangelogReader, buildSubtopology.storeToChangelogTopic(), value, this.stateUpdaterEnabled);
            arrayList.add(createActiveTask(key, value, consumer, logContext, buildSubtopology, processorStateManager, new ProcessorContextImpl(key, this.applicationConfig, processorStateManager, this.streamsMetrics, this.cache)));
        }
        return arrayList;
    }

    private RecordCollector createRecordCollector(TaskId taskId, LogContext logContext, ProcessorTopology processorTopology) {
        StreamsProducer streamsProducer;
        if (this.processingMode == StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA) {
            this.log.info("Creating producer client for task {}", taskId);
            streamsProducer = new StreamsProducer(this.applicationConfig, this.threadId, this.clientSupplier, taskId, null, logContext, this.time);
            this.taskProducers.put(taskId, streamsProducer);
        } else {
            streamsProducer = this.threadProducer;
        }
        return new RecordCollectorImpl(logContext, taskId, streamsProducer, this.applicationConfig.defaultProductionExceptionHandler(), this.streamsMetrics, processorTopology);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamTask createActiveTaskFromStandby(StandbyTask standbyTask, Set<TopicPartition> set, Consumer<byte[], byte[]> consumer) {
        if (!set.equals(standbyTask.inputPartitions)) {
            this.log.warn("Detected unmatched input partitions for task {} when recycling it from standby to active", standbyTask.id);
        }
        standbyTask.prepareRecycle();
        standbyTask.stateMgr.transitionTaskType(Task.TaskType.ACTIVE, getLogContext(standbyTask.id));
        StreamTask streamTask = new StreamTask(standbyTask.id, set, standbyTask.topology, consumer, standbyTask.config, this.streamsMetrics, this.stateDirectory, this.cache, this.time, standbyTask.stateMgr, createRecordCollector(standbyTask.id, getLogContext(standbyTask.id), standbyTask.topology), standbyTask.processorContext, standbyTask.logContext);
        this.log.trace("Created active task {} from recycled standby task with assigned partitions {}", streamTask.id, set);
        this.createTaskSensor.record();
        return streamTask;
    }

    private StreamTask createActiveTask(TaskId taskId, Set<TopicPartition> set, Consumer<byte[], byte[]> consumer, LogContext logContext, ProcessorTopology processorTopology, ProcessorStateManager processorStateManager, InternalProcessorContext<Object, Object> internalProcessorContext) {
        StreamTask streamTask = new StreamTask(taskId, set, processorTopology, consumer, this.topologyMetadata.getTaskConfigFor(taskId), this.streamsMetrics, this.stateDirectory, this.cache, this.time, processorStateManager, createRecordCollector(taskId, logContext, processorTopology), internalProcessorContext, logContext);
        this.log.trace("Created active task {} with assigned partitions {}", taskId, set);
        this.createTaskSensor.record();
        return streamTask;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeThreadProducerIfNeeded() {
        if (this.threadProducer != null) {
            try {
                this.threadProducer.close();
            } catch (RuntimeException e) {
                throw new StreamsException("Thread producer encounter error trying to close.", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeAndRemoveTaskProducerIfNeeded(TaskId taskId) {
        StreamsProducer remove = this.taskProducers.remove(taskId);
        if (remove != null) {
            try {
                remove.close();
            } catch (RuntimeException e) {
                throw new StreamsException("[" + taskId + "] task producer encounter error trying to close.", e, taskId);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<MetricName, Metric> producerMetrics() {
        return ClientUtils.producerMetrics(this.threadProducer != null ? Collections.singleton(this.threadProducer) : this.taskProducers.values());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<String> producerClientIds() {
        return this.threadProducer != null ? Collections.singleton(ClientUtils.getThreadProducerClientId(this.threadId)) : (Set) this.taskProducers.keySet().stream().map(taskId -> {
            return ClientUtils.getTaskProducerClientId(this.threadId, taskId);
        }).collect(Collectors.toSet());
    }

    private LogContext getLogContext(TaskId taskId) {
        return new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName()) + String.format("%s [%s] ", "stream-task", taskId));
    }

    public double totalProducerBlockedTime() {
        return this.threadProducer != null ? this.threadProducer.totalBlockedTime() : this.taskProducers.values().stream().mapToDouble((v0) -> {
            return v0.totalBlockedTime();
        }).sum();
    }
}
