package org.apache.kafka.streams.processor.internals;

import ch.qos.logback.classic.net.SyslogAppender;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.PartitionGroup;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-2.6.0.jar:org/apache/kafka/streams/processor/internals/StreamTask.class */
public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, Task {
    private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new ConsumerRecord<>(AbstractProcessorContext.NONEXIST_TOPIC, -1, -1, null, null);
    static final byte LATEST_MAGIC_BYTE = 1;
    private final Time time;
    private final Logger log;
    private final String logPrefix;
    private final Consumer<byte[], byte[]> mainConsumer;
    private final boolean eosEnabled;
    private final long maxTaskIdleMs;
    private final int maxBufferedSize;
    private final PartitionGroup partitionGroup;
    private final RecordCollector recordCollector;
    private final PartitionGroup.RecordInfo recordInfo;
    private final Map<TopicPartition, Long> consumedOffsets;
    private final PunctuationQueue streamTimePunctuationQueue;
    private final PunctuationQueue systemTimePunctuationQueue;
    private final StreamsMetricsImpl streamsMetrics;
    private long processTimeMs;
    private final Sensor closeTaskSensor;
    private final Sensor processRatioSensor;
    private final Sensor processLatencySensor;
    private final Sensor punctuateLatencySensor;
    private final Sensor bufferedRecordsSensor;
    private final Sensor enforcedProcessingSensor;
    private final Map<String, Sensor> e2eLatencySensors;
    private final InternalProcessorContext processorContext;
    private final RecordQueueCreator recordQueueCreator;
    private long idleStartTimeMs;
    private boolean commitNeeded;
    private boolean commitRequested;
    private boolean checkpointNeededForSuspended;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-2.6.0.jar:org/apache/kafka/streams/processor/internals/StreamTask$RecordQueueCreator.class */
    public class RecordQueueCreator {
        private final LogContext logContext;
        private final TimestampExtractor defaultTimestampExtractor;
        private final DeserializationExceptionHandler defaultDeserializationExceptionHandler;

        private RecordQueueCreator(LogContext logContext, TimestampExtractor timestampExtractor, DeserializationExceptionHandler deserializationExceptionHandler) {
            this.logContext = logContext;
            this.defaultTimestampExtractor = timestampExtractor;
            this.defaultDeserializationExceptionHandler = deserializationExceptionHandler;
        }

        public RecordQueue createQueue(TopicPartition topicPartition) {
            SourceNode<?, ?> source = StreamTask.this.topology.source(topicPartition.topic());
            TimestampExtractor timestampExtractor = source.getTimestampExtractor();
            return new RecordQueue(topicPartition, source, timestampExtractor != null ? timestampExtractor : this.defaultTimestampExtractor, this.defaultDeserializationExceptionHandler, StreamTask.this.processorContext, this.logContext);
        }
    }

    public StreamTask(TaskId taskId, Set<TopicPartition> set, ProcessorTopology processorTopology, Consumer<byte[], byte[]> consumer, StreamsConfig streamsConfig, StreamsMetricsImpl streamsMetricsImpl, StateDirectory stateDirectory, ThreadCache threadCache, Time time, ProcessorStateManager processorStateManager, RecordCollector recordCollector, InternalProcessorContext internalProcessorContext) {
        super(taskId, processorTopology, stateDirectory, processorStateManager, set);
        this.processTimeMs = 0L;
        this.e2eLatencySensors = new HashMap();
        this.commitNeeded = false;
        this.commitRequested = false;
        this.checkpointNeededForSuspended = false;
        this.mainConsumer = consumer;
        this.processorContext = internalProcessorContext;
        internalProcessorContext.transitionToActive(this, recordCollector, threadCache);
        this.logPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName()) + String.format("%s [%s] ", "task", taskId);
        LogContext logContext = new LogContext(this.logPrefix);
        this.log = logContext.logger(getClass());
        this.time = time;
        this.recordCollector = recordCollector;
        this.eosEnabled = StreamThread.eosEnabled(streamsConfig);
        String name = Thread.currentThread().getName();
        this.streamsMetrics = streamsMetricsImpl;
        this.closeTaskSensor = ThreadMetrics.closeTaskSensor(name, streamsMetricsImpl);
        String taskId2 = taskId.toString();
        if (streamsMetricsImpl.version() == StreamsMetricsImpl.Version.FROM_0100_TO_24) {
            this.enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(name, taskId2, streamsMetricsImpl, ThreadMetrics.commitOverTasksSensor(name, streamsMetricsImpl));
        } else {
            this.enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(name, taskId2, streamsMetricsImpl, new Sensor[0]);
        }
        this.processRatioSensor = TaskMetrics.activeProcessRatioSensor(name, taskId2, streamsMetricsImpl);
        this.processLatencySensor = TaskMetrics.processLatencySensor(name, taskId2, streamsMetricsImpl);
        this.punctuateLatencySensor = TaskMetrics.punctuateSensor(name, taskId2, streamsMetricsImpl);
        this.bufferedRecordsSensor = TaskMetrics.activeBufferedRecordsSensor(name, taskId2, streamsMetricsImpl);
        for (String str : processorTopology.terminalNodes()) {
            this.e2eLatencySensors.put(str, TaskMetrics.e2ELatencySensor(name, taskId2, str, Sensor.RecordingLevel.INFO, streamsMetricsImpl));
        }
        Iterator<SourceNode<?, ?>> it = processorTopology.sources().iterator();
        while (it.hasNext()) {
            String name2 = it.next().name();
            this.e2eLatencySensors.put(name2, TaskMetrics.e2ELatencySensor(name, taskId2, name2, Sensor.RecordingLevel.INFO, streamsMetricsImpl));
        }
        this.streamTimePunctuationQueue = new PunctuationQueue();
        this.systemTimePunctuationQueue = new PunctuationQueue();
        this.maxTaskIdleMs = streamsConfig.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG).longValue();
        this.maxBufferedSize = streamsConfig.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG).intValue();
        this.consumedOffsets = new HashMap();
        this.recordQueueCreator = new RecordQueueCreator(logContext, streamsConfig.defaultTimestampExtractor(), streamsConfig.defaultDeserializationExceptionHandler());
        this.recordInfo = new PartitionGroup.RecordInfo();
        this.partitionGroup = new PartitionGroup(createPartitionQueues(), TaskMetrics.recordLatenessSensor(name, taskId2, streamsMetricsImpl));
        processorStateManager.registerGlobalStateStores(processorTopology.globalStateStores());
    }

    private Map<TopicPartition, RecordQueue> createPartitionQueues() {
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : inputPartitions()) {
            hashMap.put(topicPartition, this.recordQueueCreator.createQueue(topicPartition));
        }
        return hashMap;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public boolean isActive() {
        return true;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void initializeIfNeeded() {
        if (state() == Task.State.CREATED) {
            this.recordCollector.initialize();
            StateManagerUtil.registerStateStores(this.log, this.logPrefix, this.topology, this.stateMgr, this.stateDirectory, this.processorContext);
            transitionTo(Task.State.RESTORING);
            this.log.info("Initialized");
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void completeRestoration() {
        switch (state()) {
            case RUNNING:
                return;
            case RESTORING:
                initializeMetadata();
                initializeTopology();
                this.processorContext.initialize();
                this.idleStartTimeMs = -1L;
                transitionTo(Task.State.RUNNING);
                this.log.info("Restored and ready to run");
                return;
            case CREATED:
            case SUSPENDED:
            case CLOSED:
                throw new IllegalStateException("Illegal state " + state() + " while completing restoration for active task " + this.id);
            default:
                throw new IllegalStateException("Unknown state " + state() + " while completing restoration for active task " + this.id);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void suspend() {
        switch (state()) {
            case RUNNING:
                try {
                    closeTopology();
                    this.checkpointNeededForSuspended = true;
                    return;
                } finally {
                    transitionTo(Task.State.SUSPENDED);
                    this.log.info("Suspended running");
                }
            case RESTORING:
                this.log.info("Suspended restoring");
                this.checkpointNeededForSuspended = true;
                transitionTo(Task.State.SUSPENDED);
                return;
            case CREATED:
                this.log.info("Suspended created");
                this.checkpointNeededForSuspended = false;
                transitionTo(Task.State.SUSPENDED);
                return;
            case SUSPENDED:
                this.log.info("Skip suspending since state is {}", state());
                return;
            case CLOSED:
                throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + this.id);
            default:
                throw new IllegalStateException("Unknown state " + state() + " while suspending active task " + this.id);
        }
    }

    private void closeTopology() {
        this.log.trace("Closing processor topology");
        RuntimeException runtimeException = null;
        for (ProcessorNode<?, ?> processorNode : this.topology.processors()) {
            this.processorContext.setCurrentNode(processorNode);
            try {
                processorNode.close();
                this.processorContext.setCurrentNode(null);
            } catch (RuntimeException e) {
                runtimeException = e;
                this.processorContext.setCurrentNode(null);
            } catch (Throwable th) {
                this.processorContext.setCurrentNode(null);
                throw th;
            }
        }
        if (runtimeException != null) {
            throw runtimeException;
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void resume() {
        switch (state()) {
            case RUNNING:
            case RESTORING:
            case CREATED:
                this.log.trace("Skip resuming since state is {}", state());
                return;
            case SUSPENDED:
                transitionTo(Task.State.RESTORING);
                this.log.info("Resumed to restoring state");
                return;
            case CLOSED:
                throw new IllegalStateException("Illegal state " + state() + " while resuming active task " + this.id);
            default:
                throw new IllegalStateException("Unknown state " + state() + " while resuming active task " + this.id);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
        Map<TopicPartition, OffsetAndMetadata> emptyMap;
        switch (state()) {
            case RUNNING:
            case RESTORING:
            case CREATED:
            case SUSPENDED:
                this.stateMgr.flush();
                if (this.commitNeeded) {
                    this.recordCollector.flush();
                    this.log.debug("Prepared {} task for committing", state());
                    emptyMap = committableOffsetsAndMetadata();
                } else {
                    this.log.debug("Skipped preparing {} task for commit since there is nothing to commit", state());
                    emptyMap = Collections.emptyMap();
                }
                return emptyMap;
            case CLOSED:
                throw new IllegalStateException("Illegal state " + state() + " while preparing active task " + this.id + " for committing");
            default:
                throw new IllegalStateException("Unknown state " + state() + " while preparing active task " + this.id + " for committing");
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v36, types: [java.util.Map] */
    private Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {
        HashMap hashMap;
        switch (state()) {
            case RUNNING:
            case SUSPENDED:
                Map<TopicPartition, Long> extractPartitionTimes = extractPartitionTimes();
                hashMap = new HashMap(this.consumedOffsets.size());
                Iterator<Map.Entry<TopicPartition, Long>> it = this.consumedOffsets.entrySet().iterator();
                while (it.hasNext()) {
                    TopicPartition key = it.next().getKey();
                    Long headRecordOffset = this.partitionGroup.headRecordOffset(key);
                    if (headRecordOffset == null) {
                        try {
                            headRecordOffset = Long.valueOf(this.mainConsumer.position(key));
                        } catch (TimeoutException e) {
                            throw new IllegalStateException(e);
                        } catch (KafkaException e2) {
                            throw new StreamsException(e2);
                        }
                    }
                    hashMap.put(key, new OffsetAndMetadata(headRecordOffset.longValue(), encodeTimestamp(extractPartitionTimes.get(key).longValue())));
                }
                break;
            case RESTORING:
            case CREATED:
                hashMap = Collections.emptyMap();
                break;
            case CLOSED:
                throw new IllegalStateException("Illegal state " + state() + " while getting committable offsets for active task " + this.id);
            default:
                throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + this.id);
        }
        return hashMap;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void postCommit() {
        this.commitRequested = false;
        switch (state()) {
            case RUNNING:
                if (!this.eosEnabled) {
                    writeCheckpoint();
                }
                this.log.debug("Finalized commit for running task");
                return;
            case RESTORING:
                writeCheckpoint();
                this.log.debug("Finalized commit for restoring task");
                return;
            case CREATED:
                this.log.debug("Skipped writing checkpoint for created task");
                return;
            case SUSPENDED:
                this.partitionGroup.clear();
                if (!this.checkpointNeededForSuspended) {
                    this.log.debug("Skipped writing checkpoint for uninitialized suspended task");
                    return;
                }
                writeCheckpoint();
                this.log.debug("Finalized commit for suspended task");
                this.checkpointNeededForSuspended = false;
                return;
            case CLOSED:
                throw new IllegalStateException("Illegal state " + state() + " while post committing active task " + this.id);
            default:
                throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + this.id);
        }
    }

    private Map<TopicPartition, Long> extractPartitionTimes() {
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : this.partitionGroup.partitions()) {
            hashMap.put(topicPartition, Long.valueOf(this.partitionGroup.partitionTimestamp(topicPartition)));
        }
        return hashMap;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void closeClean() {
        validateClean();
        this.streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), this.id.toString());
        close(true);
        this.log.info("Closed clean");
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void closeDirty() {
        this.streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), this.id.toString());
        close(false);
        this.log.info("Closed dirty");
    }

    @Override // org.apache.kafka.streams.processor.internals.AbstractTask, org.apache.kafka.streams.processor.internals.Task
    public void update(Set<TopicPartition> set, Map<String, List<String>> map) {
        super.update(set, map);
        PartitionGroup partitionGroup = this.partitionGroup;
        RecordQueueCreator recordQueueCreator = this.recordQueueCreator;
        recordQueueCreator.getClass();
        partitionGroup.updatePartitions(set, recordQueueCreator::createQueue);
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void closeCleanAndRecycleState() {
        validateClean();
        this.streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), this.id.toString());
        switch (state()) {
            case RUNNING:
            case RESTORING:
            case CREATED:
            case CLOSED:
                throw new IllegalStateException("Illegal state " + state() + " while recycling active task " + this.id);
            case SUSPENDED:
                this.stateMgr.recycle();
                this.recordCollector.closeClean();
                this.closeTaskSensor.record();
                transitionTo(Task.State.CLOSED);
                this.log.info("Closed clean and recycled state");
                return;
            default:
                throw new IllegalStateException("Unknown state " + state() + " while recycling active task " + this.id);
        }
    }

    private void writeCheckpoint() {
        if (this.commitNeeded) {
            this.stateMgr.checkpoint(checkpointableOffsets());
        } else {
            this.stateMgr.checkpoint(Collections.emptyMap());
        }
        this.commitNeeded = false;
    }

    private void validateClean() {
        if (this.commitNeeded) {
            this.log.debug("Tried to close clean but there was pending uncommitted data, this means we failed to commit and should close as dirty instead");
            throw new TaskMigratedException("Tried to close dirty task as clean");
        }
    }

    private void close(boolean z) {
        Runnable runnable;
        switch (state()) {
            case RUNNING:
            case RESTORING:
            case CREATED:
                throw new IllegalStateException("Illegal state " + state() + " while closing active task " + this.id);
            case SUSPENDED:
                TaskManager.executeAndMaybeSwallow(z, () -> {
                    StateManagerUtil.closeStateManager(this.log, this.logPrefix, z, this.eosEnabled, this.stateMgr, this.stateDirectory, Task.TaskType.ACTIVE);
                }, "state manager close", this.log);
                if (z) {
                    RecordCollector recordCollector = this.recordCollector;
                    recordCollector.getClass();
                    runnable = recordCollector::closeClean;
                } else {
                    RecordCollector recordCollector2 = this.recordCollector;
                    recordCollector2.getClass();
                    runnable = recordCollector2::closeDirty;
                }
                TaskManager.executeAndMaybeSwallow(z, runnable, "record collector close", this.log);
                this.partitionGroup.clear();
                this.closeTaskSensor.record();
                transitionTo(Task.State.CLOSED);
                return;
            case CLOSED:
                this.log.trace("Skip closing since state is {}", state());
                return;
            default:
                throw new IllegalStateException("Unknown state " + state() + " while closing active task " + this.id);
        }
    }

    public boolean isProcessable(long j) {
        if (state() == Task.State.CLOSED) {
            this.log.info("Stream task {} is already in {} state, skip processing it.", id(), state());
            return false;
        }
        if (this.partitionGroup.allPartitionsBuffered()) {
            this.idleStartTimeMs = -1L;
            return true;
        }
        if (this.partitionGroup.numBuffered() <= 0) {
            this.idleStartTimeMs = -1L;
            return false;
        }
        if (this.idleStartTimeMs == -1) {
            this.idleStartTimeMs = j;
        }
        if (j - this.idleStartTimeMs < this.maxTaskIdleMs) {
            return false;
        }
        this.enforcedProcessingSensor.record(1.0d, j);
        return true;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public boolean process(long j) {
        if (!isProcessable(j)) {
            return false;
        }
        StampedRecord nextRecord = this.partitionGroup.nextRecord(this.recordInfo, j);
        try {
            if (nextRecord == null) {
                return false;
            }
            try {
                try {
                    ProcessorNode<?, ?> node = this.recordInfo.node();
                    TopicPartition partition = this.recordInfo.partition();
                    this.log.trace("Start processing one record [{}]", nextRecord);
                    updateProcessorContext(nextRecord, node, j);
                    maybeRecordE2ELatency(nextRecord.timestamp, j, node.name());
                    StreamsMetricsImpl.maybeMeasureLatency(() -> {
                        node.process(nextRecord.key(), nextRecord.value());
                    }, this.time, this.processLatencySensor);
                    this.log.trace("Completed processing one record [{}]", nextRecord);
                    this.consumedOffsets.put(partition, Long.valueOf(nextRecord.offset()));
                    this.commitNeeded = true;
                    if (this.recordInfo.queue().size() == this.maxBufferedSize) {
                        this.mainConsumer.resume(Collections.singleton(partition));
                    }
                    return true;
                } catch (RuntimeException e) {
                    throw new StreamsException(String.format("Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s", id(), this.processorContext.currentNode().name(), nextRecord.topic(), Integer.valueOf(nextRecord.partition()), Long.valueOf(nextRecord.offset()), getStacktraceString(e)), e);
                }
            } catch (StreamsException e2) {
                throw e2;
            }
        } finally {
            this.processorContext.setCurrentNode(null);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void recordProcessBatchTime(long j) {
        this.processTimeMs += j;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void recordProcessTimeRatioAndBufferSize(long j, long j2) {
        this.bufferedRecordsSensor.record(this.partitionGroup.numBuffered());
        this.processRatioSensor.record(this.processTimeMs / j, j2);
        this.processTimeMs = 0L;
    }

    /* JADX WARN: Failed to calculate best type for var: r7v1 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Failed to calculate best type for var: r8v0 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 7, insn: 0x00aa: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r7 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:53:0x00aa */
    /* JADX WARN: Not initialized variable reg: 8, insn: 0x00ae: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r8 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:55:0x00ae */
    /* JADX WARN: Type inference failed for: r7v1, types: [java.io.StringWriter] */
    /* JADX WARN: Type inference failed for: r8v0, types: [java.lang.Throwable] */
    private String getStacktraceString(RuntimeException runtimeException) {
        String str = null;
        try {
            try {
                StringWriter stringWriter = new StringWriter();
                Throwable th = null;
                PrintWriter printWriter = new PrintWriter(stringWriter);
                Throwable th2 = null;
                try {
                    try {
                        runtimeException.printStackTrace(printWriter);
                        str = stringWriter.toString();
                        if (printWriter != null) {
                            if (0 != 0) {
                                try {
                                    printWriter.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                printWriter.close();
                            }
                        }
                        if (stringWriter != null) {
                            if (0 != 0) {
                                try {
                                    stringWriter.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                stringWriter.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th5) {
                    if (printWriter != null) {
                        if (th2 != null) {
                            try {
                                printWriter.close();
                            } catch (Throwable th6) {
                                th2.addSuppressed(th6);
                            }
                        } else {
                            printWriter.close();
                        }
                    }
                    throw th5;
                }
            } finally {
            }
        } catch (IOException e) {
            this.log.error("Encountered error extracting stacktrace from this exception", (Throwable) e);
        }
        return str;
    }

    @Override // org.apache.kafka.streams.processor.internals.ProcessorNodePunctuator
    public void punctuate(ProcessorNode<?, ?> processorNode, long j, PunctuationType punctuationType, Punctuator punctuator) {
        if (this.processorContext.currentNode() != null) {
            throw new IllegalStateException(String.format("%sCurrent node is not null", this.logPrefix));
        }
        updateProcessorContext(new StampedRecord(DUMMY_RECORD, j), processorNode, this.time.milliseconds());
        if (this.log.isTraceEnabled()) {
            this.log.trace("Punctuating processor {} with timestamp {} and punctuation type {}", processorNode.name(), Long.valueOf(j), punctuationType);
        }
        try {
            try {
                StreamsMetricsImpl.maybeMeasureLatency(() -> {
                    processorNode.punctuate(j, punctuator);
                }, this.time, this.punctuateLatencySensor);
                this.processorContext.setCurrentNode(null);
            } catch (StreamsException e) {
                throw e;
            } catch (RuntimeException e2) {
                throw new StreamsException(String.format("%sException caught while punctuating processor '%s'", this.logPrefix, processorNode.name()), e2);
            }
        } catch (Throwable th) {
            this.processorContext.setCurrentNode(null);
            throw th;
        }
    }

    private void updateProcessorContext(StampedRecord stampedRecord, ProcessorNode<?, ?> processorNode, long j) {
        this.processorContext.setRecordContext(new ProcessorRecordContext(stampedRecord.timestamp, stampedRecord.offset(), stampedRecord.partition(), stampedRecord.topic(), stampedRecord.headers()));
        this.processorContext.setCurrentNode(processorNode);
        this.processorContext.setSystemTimeMs(j);
    }

    private Map<TopicPartition, Long> checkpointableOffsets() {
        HashMap hashMap = new HashMap(this.recordCollector.offsets());
        for (Map.Entry<TopicPartition, Long> entry : this.consumedOffsets.entrySet()) {
            hashMap.putIfAbsent(entry.getKey(), entry.getValue());
        }
        return hashMap;
    }

    private void initializeMetadata() {
        try {
            initializeTaskTime((Map) this.mainConsumer.committed(inputPartitions()).entrySet().stream().filter(entry -> {
                return entry.getValue() != null;
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            })));
        } catch (TimeoutException e) {
            this.log.warn("Encountered {} while trying to fetch committed offsets, will retry initializing the metadata in the next loop.\nConsider overwriting consumer config {} to a larger value to avoid timeout errors", e.toString(), "default.api.timeout.ms");
            throw e;
        } catch (KafkaException e2) {
            throw new StreamsException(String.format("task [%s] Failed to initialize offsets for %s", this.id, inputPartitions()), e2);
        }
    }

    private void initializeTaskTime(Map<TopicPartition, OffsetAndMetadata> map) {
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            OffsetAndMetadata value = entry.getValue();
            if (value != null) {
                long decodeTimestamp = decodeTimestamp(value.metadata());
                this.partitionGroup.setPartitionTime(key, decodeTimestamp);
                this.log.debug("A committed timestamp was detected: setting the partition time of partition {} to {} in stream task {}", key, Long.valueOf(decodeTimestamp), this.id);
            } else {
                this.log.debug("No committed timestamp was found in metadata for partition {}", key);
            }
        }
        HashSet hashSet = new HashSet(inputPartitions());
        hashSet.removeAll(map.keySet());
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            this.log.debug("No committed offset for partition {}, therefore no timestamp can be found for this partition", (TopicPartition) it.next());
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public Map<TopicPartition, Long> purgeableOffsets() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : this.consumedOffsets.entrySet()) {
            TopicPartition key = entry.getKey();
            if (this.topology.isRepartitionTopic(key.topic())) {
                hashMap.put(key, Long.valueOf(entry.getValue().longValue() + 1));
            }
        }
        return hashMap;
    }

    private void initializeTopology() {
        this.log.trace("Initializing processor nodes of the topology");
        for (ProcessorNode<?, ?> processorNode : this.topology.processors()) {
            this.processorContext.setCurrentNode(processorNode);
            try {
                processorNode.init(this.processorContext);
            } finally {
                this.processorContext.setCurrentNode(null);
            }
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void addRecords(TopicPartition topicPartition, Iterable<ConsumerRecord<byte[], byte[]>> iterable) {
        int addRawRecords = this.partitionGroup.addRawRecords(topicPartition, iterable);
        if (this.log.isTraceEnabled()) {
            this.log.trace("Added records into the buffered queue of partition {}, new queue size is {}", topicPartition, Integer.valueOf(addRawRecords));
        }
        if (addRawRecords > this.maxBufferedSize) {
            this.mainConsumer.pause(Collections.singleton(topicPartition));
        }
    }

    public Cancellable schedule(long j, PunctuationType punctuationType, Punctuator punctuator) {
        switch (punctuationType) {
            case STREAM_TIME:
                return schedule(0L, j, punctuationType, punctuator);
            case WALL_CLOCK_TIME:
                return schedule(this.time.milliseconds() + j, j, punctuationType, punctuator);
            default:
                throw new IllegalArgumentException("Unrecognized PunctuationType: " + punctuationType);
        }
    }

    private Cancellable schedule(long j, long j2, PunctuationType punctuationType, Punctuator punctuator) {
        if (this.processorContext.currentNode() == null) {
            throw new IllegalStateException(String.format("%sCurrent node is null", this.logPrefix));
        }
        PunctuationSchedule punctuationSchedule = new PunctuationSchedule(this.processorContext.currentNode(), j, j2, punctuator);
        switch (punctuationType) {
            case STREAM_TIME:
                return this.streamTimePunctuationQueue.schedule(punctuationSchedule);
            case WALL_CLOCK_TIME:
                return this.systemTimePunctuationQueue.schedule(punctuationSchedule);
            default:
                throw new IllegalArgumentException("Unrecognized PunctuationType: " + punctuationType);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public boolean maybePunctuateStreamTime() {
        long streamTime = this.partitionGroup.streamTime();
        if (streamTime == -1) {
            return false;
        }
        boolean mayPunctuate = this.streamTimePunctuationQueue.mayPunctuate(streamTime, PunctuationType.STREAM_TIME, this);
        if (mayPunctuate) {
            this.commitNeeded = true;
        }
        return mayPunctuate;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public boolean maybePunctuateSystemTime() {
        boolean mayPunctuate = this.systemTimePunctuationQueue.mayPunctuate(this.time.milliseconds(), PunctuationType.WALL_CLOCK_TIME, this);
        if (mayPunctuate) {
            this.commitNeeded = true;
        }
        return mayPunctuate;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void maybeRecordE2ELatency(long j, long j2, String str) {
        Sensor sensor = this.e2eLatencySensors.get(str);
        if (sensor == null) {
            throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + str);
        }
        if (sensor.shouldRecord() && sensor.hasMetrics()) {
            sensor.record(j2 - j, j2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void requestCommit() {
        this.commitRequested = true;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public boolean commitRequested() {
        return this.commitRequested;
    }

    static String encodeTimestamp(long j) {
        ByteBuffer allocate = ByteBuffer.allocate(9);
        allocate.put((byte) 1);
        allocate.putLong(j);
        return Base64.getEncoder().encodeToString(allocate.array());
    }

    long decodeTimestamp(String str) {
        if (str.isEmpty()) {
            return -1L;
        }
        ByteBuffer wrap = ByteBuffer.wrap(Base64.getDecoder().decode(str));
        byte b = wrap.get();
        switch (b) {
            case 1:
                return wrap.getLong();
            default:
                this.log.warn("Unsupported offset metadata version found. Supported version {}. Found version {}.", (Object) (byte) 1, (Object) Byte.valueOf(b));
                return -1L;
        }
    }

    public InternalProcessorContext processorContext() {
        return this.processorContext;
    }

    public String toString() {
        return toString("");
    }

    public String toString(String str) {
        StringBuilder sb = new StringBuilder();
        sb.append(str);
        sb.append("TaskId: ");
        sb.append(this.id);
        sb.append("\n");
        if (this.topology != null) {
            sb.append(str).append(this.topology.toString(str + SyslogAppender.DEFAULT_STACKTRACE_PATTERN));
        }
        Set<TopicPartition> inputPartitions = inputPartitions();
        if (inputPartitions != null && !inputPartitions.isEmpty()) {
            sb.append(str).append("Partitions [");
            Iterator<TopicPartition> it = inputPartitions.iterator();
            while (it.hasNext()) {
                sb.append(it.next()).append(", ");
            }
            sb.setLength(sb.length() - 2);
            sb.append("]\n");
        }
        return sb.toString();
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public boolean commitNeeded() {
        return this.commitNeeded;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public Map<TopicPartition, Long> changelogOffsets() {
        return state() == Task.State.RUNNING ? (Map) changelogPartitions().stream().collect(Collectors.toMap(Function.identity(), topicPartition -> {
            return -2L;
        })) : Collections.unmodifiableMap(this.stateMgr.changelogOffsets());
    }

    public boolean hasRecordsQueued() {
        return numBuffered() > 0;
    }

    RecordCollector recordCollector() {
        return this.recordCollector;
    }

    int numBuffered() {
        return this.partitionGroup.numBuffered();
    }

    long streamTime() {
        return this.partitionGroup.streamTime();
    }
}
