/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.test;

import java.time.Duration;
import java.util.ArrayList;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.junit.Assert;

public class MockProcessor<K, V>
extends AbstractProcessor<K, V> {
    public final ArrayList<String> processed = new ArrayList();
    public final ArrayList<K> processedKeys = new ArrayList();
    public final ArrayList<V> processedValues = new ArrayList();
    public final ArrayList<Long> punctuatedStreamTime = new ArrayList();
    public final ArrayList<Long> punctuatedSystemTime = new ArrayList();
    public Cancellable scheduleCancellable;
    private final PunctuationType punctuationType;
    private final long scheduleInterval;
    private boolean commitRequested = false;

    public MockProcessor(PunctuationType punctuationType, long scheduleInterval) {
        this.punctuationType = punctuationType;
        this.scheduleInterval = scheduleInterval;
    }

    public MockProcessor() {
        this(PunctuationType.STREAM_TIME, -1L);
    }

    public void init(ProcessorContext context) {
        super.init(context);
        if (this.scheduleInterval > 0L) {
            this.scheduleCancellable = context.schedule(Duration.ofMillis(this.scheduleInterval), this.punctuationType, new Punctuator(){

                public void punctuate(long timestamp) {
                    if (MockProcessor.this.punctuationType == PunctuationType.STREAM_TIME) {
                        Assert.assertEquals((long)timestamp, (long)MockProcessor.this.context().timestamp());
                    }
                    Assert.assertEquals((long)-1L, (long)MockProcessor.this.context().partition());
                    Assert.assertEquals((long)-1L, (long)MockProcessor.this.context().offset());
                    (MockProcessor.this.punctuationType == PunctuationType.STREAM_TIME ? MockProcessor.this.punctuatedStreamTime : MockProcessor.this.punctuatedSystemTime).add(timestamp);
                }
            });
        }
    }

    public void process(K key, V value) {
        this.processedKeys.add(key);
        this.processedValues.add(value);
        this.processed.add((key == null ? "null" : key) + ":" + (value == null ? "null" : value));
        if (this.commitRequested) {
            this.context().commit();
            this.commitRequested = false;
        }
    }

    public void checkAndClearProcessResult(String ... expected) {
        Assert.assertEquals((String)("the number of outputs:" + this.processed), (long)expected.length, (long)this.processed.size());
        for (int i = 0; i < expected.length; ++i) {
            Assert.assertEquals((String)("output[" + i + "]:"), (Object)expected[i], (Object)this.processed.get(i));
        }
        this.processed.clear();
    }

    public void requestCommit() {
        this.commitRequested = true;
    }

    public void checkEmptyAndClearProcessResult() {
        Assert.assertEquals((String)"the number of outputs:", (long)0L, (long)this.processed.size());
        this.processed.clear();
    }

    public void checkAndClearPunctuateResult(PunctuationType type, long ... expected) {
        ArrayList<Long> punctuated = type == PunctuationType.STREAM_TIME ? this.punctuatedStreamTime : this.punctuatedSystemTime;
        Assert.assertEquals((String)"the number of outputs:", (long)expected.length, (long)punctuated.size());
        for (int i = 0; i < expected.length; ++i) {
            Assert.assertEquals((String)("output[" + i + "]:"), (long)expected[i], (long)punctuated.get(i));
        }
        this.processed.clear();
    }
}

