package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-1.1.1.jar:org/apache/kafka/streams/kstream/internals/KStreamTransform.class */
public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
    private final TransformerSupplier<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> transformerSupplier;

    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-1.1.1.jar:org/apache/kafka/streams/kstream/internals/KStreamTransform$KStreamTransformProcessor.class */
    public static class KStreamTransformProcessor<K1, V1, K2, V2> extends AbstractProcessor<K1, V1> {
        private final Transformer<? super K1, ? super V1, ? extends KeyValue<? extends K2, ? extends V2>> transformer;

        public KStreamTransformProcessor(Transformer<? super K1, ? super V1, ? extends KeyValue<? extends K2, ? extends V2>> transformer) {
            this.transformer = transformer;
        }

        @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
        public void init(ProcessorContext processorContext) {
            super.init(processorContext);
            this.transformer.init(processorContext);
        }

        @Override // org.apache.kafka.streams.processor.Processor
        public void process(K1 k1, V1 v1) {
            KeyValue<? extends K2, ? extends V2> transform = this.transformer.transform(k1, v1);
            if (transform != null) {
                context().forward(transform.key, transform.value);
            }
        }

        @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
        public void punctuate(long j) {
            KeyValue<? extends K2, ? extends V2> punctuate = this.transformer.punctuate(j);
            if (punctuate != null) {
                context().forward(punctuate.key, punctuate.value);
            }
        }

        @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
        public void close() {
            this.transformer.close();
        }
    }

    public KStreamTransform(TransformerSupplier<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> transformerSupplier) {
        this.transformerSupplier = transformerSupplier;
    }

    @Override // org.apache.kafka.streams.processor.ProcessorSupplier
    public Processor<K, V> get() {
        return new KStreamTransformProcessor(this.transformerSupplier.get());
    }
}
