package org.apache.kafka.streams;

import java.io.File;
import java.time.Duration;
import java.util.Iterator;
import java.util.Properties;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/MockProcessorContextTest.class */
public class MockProcessorContextTest {
    @Test
    public void shouldCaptureOutputRecords() {
        AbstractProcessor<String, Long> abstractProcessor = new AbstractProcessor<String, Long>() { // from class: org.apache.kafka.streams.MockProcessorContextTest.1
            public void process(String str, Long l) {
                context().forward(str + l, Long.valueOf(str.length() + l.longValue()));
            }
        };
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        abstractProcessor.init(mockProcessorContext);
        abstractProcessor.process("foo", 5L);
        abstractProcessor.process("barbaz", 50L);
        Iterator it = mockProcessorContext.forwarded().iterator();
        Assert.assertEquals(new KeyValue("foo5", 8L), ((MockProcessorContext.CapturedForward) it.next()).keyValue());
        Assert.assertEquals(new KeyValue("barbaz50", 56L), ((MockProcessorContext.CapturedForward) it.next()).keyValue());
        Assert.assertFalse(it.hasNext());
        mockProcessorContext.resetForwards();
        Assert.assertEquals(0L, mockProcessorContext.forwarded().size());
    }

    @Test
    public void shouldCaptureOutputRecordsUsingTo() {
        AbstractProcessor<String, Long> abstractProcessor = new AbstractProcessor<String, Long>() { // from class: org.apache.kafka.streams.MockProcessorContextTest.2
            public void process(String str, Long l) {
                context().forward(str + l, Long.valueOf(str.length() + l.longValue()), To.all());
            }
        };
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        abstractProcessor.init(mockProcessorContext);
        abstractProcessor.process("foo", 5L);
        abstractProcessor.process("barbaz", 50L);
        Iterator it = mockProcessorContext.forwarded().iterator();
        Assert.assertEquals(new KeyValue("foo5", 8L), ((MockProcessorContext.CapturedForward) it.next()).keyValue());
        Assert.assertEquals(new KeyValue("barbaz50", 56L), ((MockProcessorContext.CapturedForward) it.next()).keyValue());
        Assert.assertFalse(it.hasNext());
        mockProcessorContext.resetForwards();
        Assert.assertEquals(0L, mockProcessorContext.forwarded().size());
    }

    @Test
    public void shouldCaptureRecordsOutputToChildByName() {
        AbstractProcessor<String, Long> abstractProcessor = new AbstractProcessor<String, Long>() { // from class: org.apache.kafka.streams.MockProcessorContextTest.3
            private int count = 0;

            public void process(String str, Long l) {
                if (this.count == 0) {
                    context().forward("start", -1L, To.all());
                }
                context().forward(str + l, Long.valueOf(str.length() + l.longValue()), this.count % 2 == 0 ? To.child("george") : To.child("pete"));
                this.count++;
            }
        };
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        abstractProcessor.init(mockProcessorContext);
        abstractProcessor.process("foo", 5L);
        abstractProcessor.process("barbaz", 50L);
        Iterator it = mockProcessorContext.forwarded().iterator();
        MockProcessorContext.CapturedForward capturedForward = (MockProcessorContext.CapturedForward) it.next();
        Assert.assertEquals(new KeyValue("start", -1L), capturedForward.keyValue());
        Assert.assertNull(capturedForward.childName());
        MockProcessorContext.CapturedForward capturedForward2 = (MockProcessorContext.CapturedForward) it.next();
        Assert.assertEquals(new KeyValue("foo5", 8L), capturedForward2.keyValue());
        Assert.assertEquals("george", capturedForward2.childName());
        MockProcessorContext.CapturedForward capturedForward3 = (MockProcessorContext.CapturedForward) it.next();
        Assert.assertEquals(new KeyValue("barbaz50", 56L), capturedForward3.keyValue());
        Assert.assertEquals("pete", capturedForward3.childName());
        Assert.assertFalse(it.hasNext());
        Iterator it2 = mockProcessorContext.forwarded("george").iterator();
        Assert.assertEquals(new KeyValue("start", -1L), ((MockProcessorContext.CapturedForward) it2.next()).keyValue());
        Assert.assertEquals(new KeyValue("foo5", 8L), ((MockProcessorContext.CapturedForward) it2.next()).keyValue());
        Assert.assertFalse(it2.hasNext());
        Iterator it3 = mockProcessorContext.forwarded("pete").iterator();
        Assert.assertEquals(new KeyValue("start", -1L), ((MockProcessorContext.CapturedForward) it3.next()).keyValue());
        Assert.assertEquals(new KeyValue("barbaz50", 56L), ((MockProcessorContext.CapturedForward) it3.next()).keyValue());
        Assert.assertFalse(it3.hasNext());
        Iterator it4 = mockProcessorContext.forwarded("steve").iterator();
        Assert.assertEquals(new KeyValue("start", -1L), ((MockProcessorContext.CapturedForward) it4.next()).keyValue());
        Assert.assertFalse(it4.hasNext());
    }

    @Test
    public void shouldThrowIfForwardedWithDeprecatedChildIndex() {
        AbstractProcessor<String, Long> abstractProcessor = new AbstractProcessor<String, Long>() { // from class: org.apache.kafka.streams.MockProcessorContextTest.4
            public void process(String str, Long l) {
                context().forward(str, l, 0);
            }
        };
        abstractProcessor.init(new MockProcessorContext());
        try {
            abstractProcessor.process("foo", 5L);
            Assert.fail("Should have thrown an UnsupportedOperationException.");
        } catch (UnsupportedOperationException e) {
        }
    }

    @Test
    public void shouldThrowIfForwardedWithDeprecatedChildName() {
        AbstractProcessor<String, Long> abstractProcessor = new AbstractProcessor<String, Long>() { // from class: org.apache.kafka.streams.MockProcessorContextTest.5
            public void process(String str, Long l) {
                context().forward(str, l, "child1");
            }
        };
        abstractProcessor.init(new MockProcessorContext());
        try {
            abstractProcessor.process("foo", 5L);
            Assert.fail("Should have thrown an UnsupportedOperationException.");
        } catch (UnsupportedOperationException e) {
        }
    }

    @Test
    public void shouldCaptureCommitsAndAllowReset() {
        AbstractProcessor<String, Long> abstractProcessor = new AbstractProcessor<String, Long>() { // from class: org.apache.kafka.streams.MockProcessorContextTest.6
            private int count = 0;

            public void process(String str, Long l) {
                int i = this.count + 1;
                this.count = i;
                if (i > 2) {
                    context().commit();
                }
            }
        };
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        abstractProcessor.init(mockProcessorContext);
        abstractProcessor.process("foo", 5L);
        abstractProcessor.process("barbaz", 50L);
        Assert.assertFalse(mockProcessorContext.committed());
        abstractProcessor.process("foobar", 500L);
        Assert.assertTrue(mockProcessorContext.committed());
        mockProcessorContext.resetCommit();
        Assert.assertFalse(mockProcessorContext.committed());
    }

    @Test
    public void shouldStoreAndReturnStateStores() {
        AbstractProcessor<String, Long> abstractProcessor = new AbstractProcessor<String, Long>() { // from class: org.apache.kafka.streams.MockProcessorContextTest.7
            public void process(String str, Long l) {
                KeyValueStore stateStore = context().getStateStore("my-state");
                stateStore.put(str, Long.valueOf((stateStore.get(str) == null ? 0L : ((Long) stateStore.get(str)).longValue()) + l.longValue()));
                stateStore.put("all", Long.valueOf((stateStore.get("all") == null ? 0L : ((Long) stateStore.get("all")).longValue()) + l.longValue()));
            }
        };
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        KeyValueStore build = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("my-state"), Serdes.String(), Serdes.Long()).withLoggingDisabled().build();
        build.init(mockProcessorContext, build);
        abstractProcessor.init(mockProcessorContext);
        abstractProcessor.process("foo", 5L);
        abstractProcessor.process("bar", 50L);
        Assert.assertEquals(5L, ((Long) build.get("foo")).longValue());
        Assert.assertEquals(50L, ((Long) build.get("bar")).longValue());
        Assert.assertEquals(55L, ((Long) build.get("all")).longValue());
    }

    @Test
    public void shouldCaptureApplicationAndRecordMetadata() {
        Properties properties = new Properties();
        properties.put("application.id", "testMetadata");
        properties.put("bootstrap.servers", "");
        AbstractProcessor<String, Object> abstractProcessor = new AbstractProcessor<String, Object>() { // from class: org.apache.kafka.streams.MockProcessorContextTest.8
            public void process(String str, Object obj) {
                context().forward("appId", context().applicationId());
                context().forward("taskId", context().taskId());
                context().forward("topic", context().topic());
                context().forward("partition", Integer.valueOf(context().partition()));
                context().forward("offset", Long.valueOf(context().offset()));
                context().forward("timestamp", Long.valueOf(context().timestamp()));
                context().forward("key", str);
                context().forward("value", obj);
            }
        };
        MockProcessorContext mockProcessorContext = new MockProcessorContext(properties);
        abstractProcessor.init(mockProcessorContext);
        try {
            abstractProcessor.process("foo", 5L);
            Assert.fail("Should have thrown an exception.");
        } catch (IllegalStateException e) {
        }
        mockProcessorContext.resetForwards();
        mockProcessorContext.setRecordMetadata("t1", 0, 0L, (Headers) null, 0L);
        abstractProcessor.process("foo", 5L);
        Iterator it = mockProcessorContext.forwarded().iterator();
        Assert.assertEquals(new KeyValue("appId", "testMetadata"), ((MockProcessorContext.CapturedForward) it.next()).keyValue());
        Assert.assertEquals(new KeyValue("taskId", new TaskId(0, 0)), ((MockProcessorContext.CapturedForward) it.next()).keyValue());
        Assert.assertEquals(new KeyValue("topic", "t1"), ((MockProcessorContext.CapturedForward) it.next()).keyValue());
        Assert.assertEquals(new KeyValue("partition", 0), ((MockProcessorContext.CapturedForward) it.next()).keyValue());
        Assert.assertEquals(new KeyValue("offset", 0L), ((MockProcessorContext.CapturedForward) it.next()).keyValue());
        Assert.assertEquals(new KeyValue("timestamp", 0L), ((MockProcessorContext.CapturedForward) it.next()).keyValue());
        Assert.assertEquals(new KeyValue("key", "foo"), ((MockProcessorContext.CapturedForward) it.next()).keyValue());
        Assert.assertEquals(new KeyValue("value", 5L), ((MockProcessorContext.CapturedForward) it.next()).keyValue());
        mockProcessorContext.resetForwards();
        mockProcessorContext.setOffset(1L);
        mockProcessorContext.setTimestamp(10L);
        abstractProcessor.process("bar", 50L);
        Iterator it2 = mockProcessorContext.forwarded().iterator();
        Assert.assertEquals(new KeyValue("appId", "testMetadata"), ((MockProcessorContext.CapturedForward) it2.next()).keyValue());
        Assert.assertEquals(new KeyValue("taskId", new TaskId(0, 0)), ((MockProcessorContext.CapturedForward) it2.next()).keyValue());
        Assert.assertEquals(new KeyValue("topic", "t1"), ((MockProcessorContext.CapturedForward) it2.next()).keyValue());
        Assert.assertEquals(new KeyValue("partition", 0), ((MockProcessorContext.CapturedForward) it2.next()).keyValue());
        Assert.assertEquals(new KeyValue("offset", 1L), ((MockProcessorContext.CapturedForward) it2.next()).keyValue());
        Assert.assertEquals(new KeyValue("timestamp", 10L), ((MockProcessorContext.CapturedForward) it2.next()).keyValue());
        Assert.assertEquals(new KeyValue("key", "bar"), ((MockProcessorContext.CapturedForward) it2.next()).keyValue());
        Assert.assertEquals(new KeyValue("value", 50L), ((MockProcessorContext.CapturedForward) it2.next()).keyValue());
        mockProcessorContext.resetForwards();
        mockProcessorContext.setTopic("t2");
        mockProcessorContext.setPartition(30);
        abstractProcessor.process("baz", 500L);
        Iterator it3 = mockProcessorContext.forwarded().iterator();
        Assert.assertEquals(new KeyValue("appId", "testMetadata"), ((MockProcessorContext.CapturedForward) it3.next()).keyValue());
        Assert.assertEquals(new KeyValue("taskId", new TaskId(0, 0)), ((MockProcessorContext.CapturedForward) it3.next()).keyValue());
        Assert.assertEquals(new KeyValue("topic", "t2"), ((MockProcessorContext.CapturedForward) it3.next()).keyValue());
        Assert.assertEquals(new KeyValue("partition", 30), ((MockProcessorContext.CapturedForward) it3.next()).keyValue());
        Assert.assertEquals(new KeyValue("offset", 1L), ((MockProcessorContext.CapturedForward) it3.next()).keyValue());
        Assert.assertEquals(new KeyValue("timestamp", 10L), ((MockProcessorContext.CapturedForward) it3.next()).keyValue());
        Assert.assertEquals(new KeyValue("key", "baz"), ((MockProcessorContext.CapturedForward) it3.next()).keyValue());
        Assert.assertEquals(new KeyValue("value", 500L), ((MockProcessorContext.CapturedForward) it3.next()).keyValue());
    }

    @Test
    public void shouldCapturePunctuator() {
        Processor<String, Long> processor = new Processor<String, Long>() { // from class: org.apache.kafka.streams.MockProcessorContextTest.9
            public void init(ProcessorContext processorContext) {
                processorContext.schedule(Duration.ofSeconds(1L), PunctuationType.WALL_CLOCK_TIME, j -> {
                    processorContext.commit();
                });
            }

            public void process(String str, Long l) {
            }

            public void close() {
            }
        };
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        processor.init(mockProcessorContext);
        MockProcessorContext.CapturedPunctuator capturedPunctuator = (MockProcessorContext.CapturedPunctuator) mockProcessorContext.scheduledPunctuators().get(0);
        Assert.assertEquals(1000L, capturedPunctuator.getIntervalMs());
        Assert.assertEquals(PunctuationType.WALL_CLOCK_TIME, capturedPunctuator.getType());
        Assert.assertFalse(capturedPunctuator.cancelled());
        Punctuator punctuator = capturedPunctuator.getPunctuator();
        Assert.assertFalse(mockProcessorContext.committed());
        punctuator.punctuate(1234L);
        Assert.assertTrue(mockProcessorContext.committed());
    }

    @Test
    public void fullConstructorShouldSetAllExpectedAttributes() {
        Properties properties = new Properties();
        properties.put("application.id", "testFullConstructor");
        properties.put("bootstrap.servers", "");
        properties.put("default.key.serde", Serdes.String().getClass());
        properties.put("default.value.serde", Serdes.Long().getClass());
        File file = new File("");
        MockProcessorContext mockProcessorContext = new MockProcessorContext(properties, new TaskId(1, 1), file);
        Assert.assertEquals("testFullConstructor", mockProcessorContext.applicationId());
        Assert.assertEquals(new TaskId(1, 1), mockProcessorContext.taskId());
        Assert.assertEquals("testFullConstructor", mockProcessorContext.appConfigs().get("application.id"));
        Assert.assertEquals("testFullConstructor", mockProcessorContext.appConfigsWithPrefix("application.").get("id"));
        Assert.assertEquals(Serdes.String().getClass(), mockProcessorContext.keySerde().getClass());
        Assert.assertEquals(Serdes.Long().getClass(), mockProcessorContext.valueSerde().getClass());
        Assert.assertEquals(file, mockProcessorContext.stateDir());
    }
}
