package org.apache.kafka.streams.kstream.internals.suppress;

import java.util.Objects;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
import org.apache.kafka.streams.kstream.internals.suppress.TimeDefinitions;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.internals.ContextualRecord;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-2.1.0.jar:org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.class */
public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
    private final long maxRecords;
    private final long maxBytes;
    private final long suppressDurationMillis;
    private final TimeDefinitions.TimeDefinition<K> bufferTimeDefinition;
    private final BufferFullStrategy bufferFullStrategy;
    private final boolean shouldSuppressTombstones;
    private final String storeName;
    private TimeOrderedKeyValueBuffer buffer;
    private InternalProcessorContext internalProcessorContext;
    private Serde<K> keySerde;
    private FullChangeSerde<V> valueSerde;

    public KTableSuppressProcessor(SuppressedInternal<K> suppressedInternal, String str, Serde<K> serde, FullChangeSerde<V> fullChangeSerde) {
        this.storeName = str;
        Objects.requireNonNull(suppressedInternal);
        this.keySerde = serde;
        this.valueSerde = fullChangeSerde;
        this.maxRecords = suppressedInternal.bufferConfig().maxRecords();
        this.maxBytes = suppressedInternal.bufferConfig().maxBytes();
        this.suppressDurationMillis = suppressedInternal.timeToWaitForMoreEvents().toMillis();
        this.bufferTimeDefinition = suppressedInternal.timeDefinition();
        this.bufferFullStrategy = suppressedInternal.bufferConfig().bufferFullStrategy();
        this.shouldSuppressTombstones = suppressedInternal.shouldSuppressTombstones();
    }

    @Override // org.apache.kafka.streams.processor.Processor
    public void init(ProcessorContext processorContext) {
        this.internalProcessorContext = (InternalProcessorContext) processorContext;
        this.keySerde = this.keySerde == null ? (Serde<K>) processorContext.keySerde() : this.keySerde;
        this.valueSerde = this.valueSerde == null ? FullChangeSerde.castOrWrap(processorContext.valueSerde()) : this.valueSerde;
        this.buffer = (TimeOrderedKeyValueBuffer) Objects.requireNonNull((TimeOrderedKeyValueBuffer) processorContext.getStateStore(this.storeName));
    }

    public void process(K k, Change<V> change) {
        buffer(k, change);
        enforceConstraints();
    }

    private void buffer(K k, Change<V> change) {
        long time = this.bufferTimeDefinition.time(this.internalProcessorContext, k);
        ProcessorRecordContext recordContext = this.internalProcessorContext.recordContext();
        this.buffer.put(time, Bytes.wrap(this.keySerde.serializer().serialize(null, k)), new ContextualRecord(this.valueSerde.serializer().serialize(null, change), recordContext));
    }

    private void enforceConstraints() {
        long streamTime = this.internalProcessorContext.streamTime() - this.suppressDurationMillis;
        this.buffer.evictWhile(() -> {
            return Boolean.valueOf(this.buffer.minTimestamp() <= streamTime);
        }, this::emit);
        if (overCapacity()) {
            switch (this.bufferFullStrategy) {
                case EMIT:
                    this.buffer.evictWhile(this::overCapacity, this::emit);
                    return;
                case SHUT_DOWN:
                    throw new StreamsException(String.format("%s buffer exceeded its max capacity. Currently [%d/%d] records and [%d/%d] bytes.", this.internalProcessorContext.currentNode().name(), Integer.valueOf(this.buffer.numRecords()), Long.valueOf(this.maxRecords), Long.valueOf(this.buffer.bufferSize()), Long.valueOf(this.maxBytes)));
                default:
                    return;
            }
        }
    }

    private boolean overCapacity() {
        return ((long) this.buffer.numRecords()) > this.maxRecords || this.buffer.bufferSize() > this.maxBytes;
    }

    private void emit(KeyValue<Bytes, ContextualRecord> keyValue) {
        Change<V> deserialize = this.valueSerde.deserializer().deserialize(null, keyValue.value.value());
        if (shouldForward(deserialize)) {
            ProcessorRecordContext recordContext = this.internalProcessorContext.recordContext();
            this.internalProcessorContext.setRecordContext(keyValue.value.recordContext());
            try {
                this.internalProcessorContext.forward(this.keySerde.deserializer().deserialize(null, keyValue.key.get()), deserialize);
                this.internalProcessorContext.setRecordContext(recordContext);
            } catch (Throwable th) {
                this.internalProcessorContext.setRecordContext(recordContext);
                throw th;
            }
        }
    }

    private boolean shouldForward(Change<V> change) {
        return (change.newValue == null && this.shouldSuppressTombstones) ? false : true;
    }

    @Override // org.apache.kafka.streams.processor.Processor
    public void close() {
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.processor.Processor
    public /* bridge */ /* synthetic */ void process(Object obj, Object obj2) {
        process((KTableSuppressProcessor<K, V>) obj, (Change) obj2);
    }
}
