package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/SinkNodeTest.class */
public class SinkNodeTest {
    private final Serializer<byte[]> anySerializer = Serdes.ByteArray().serializer();
    private final StateSerdes<Bytes, Bytes> anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
    private final InternalMockProcessorContext context = new InternalMockProcessorContext((StateSerdes<?, ?>) this.anyStateSerde, (RecordCollector) new RecordCollectorImpl(new MockProducer(true, this.anySerializer, this.anySerializer), (String) null, new LogContext("sinknode-test "), new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records")));
    private final SinkNode sink = new SinkNode("anyNodeName", new StaticTopicNameExtractor("any-output-topic"), this.anySerializer, this.anySerializer, (StreamPartitioner) null);

    @Before
    public void before() {
        this.sink.init(this.context);
    }

    @Test
    public void shouldThrowStreamsExceptionOnInputRecordWithInvalidTimestamp() {
        Bytes bytes = new Bytes("any key".getBytes());
        Bytes bytes2 = new Bytes("any value".getBytes());
        this.context.setTime(-1L);
        try {
            this.sink.process(bytes, bytes2);
            Assert.fail("Should have thrown StreamsException");
        } catch (StreamsException e) {
        }
    }

    @Test
    public void shouldThrowStreamsExceptionOnKeyValueTypeSerializerMismatch() {
        this.context.setTime(0L);
        try {
            this.sink.process("key with different type", "value with different type");
            Assert.fail("Should have thrown StreamsException");
        } catch (StreamsException e) {
            MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(ClassCastException.class));
        }
    }

    @Test
    public void shouldHandleNullKeysWhenThrowingStreamsExceptionOnKeyValueTypeSerializerMismatch() {
        this.context.setTime(1L);
        try {
            this.sink.process((Object) null, "");
            Assert.fail("Should have thrown StreamsException");
        } catch (StreamsException e) {
            MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(ClassCastException.class));
            MatcherAssert.assertThat(e.getMessage(), CoreMatchers.containsString("unknown because key is null"));
        }
    }

    @Test
    public void shouldHandleNullValuesWhenThrowingStreamsExceptionOnKeyValueTypeSerializerMismatch() {
        this.context.setTime(1L);
        try {
            this.sink.process("", (Object) null);
            Assert.fail("Should have thrown StreamsException");
        } catch (StreamsException e) {
            MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(ClassCastException.class));
            MatcherAssert.assertThat(e.getMessage(), CoreMatchers.containsString("unknown because value is null"));
        }
    }
}
