package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.class */
public class KStreamKStreamLeftJoinTest {
    private static final String[] EMPTY = new String[0];
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
    private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer(), 0);
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());

    @Test
    public void testLeftJoin() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        int[] iArr = {0, 1, 2, 3};
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        streamsBuilder.stream("topic1", this.consumed).leftJoin(streamsBuilder.stream("topic2", this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockProcessorSupplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).copartitionGroups();
        Assert.assertEquals(1L, copartitionGroups.size());
        Assert.assertEquals(new HashSet(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                MockProcessor theCapturedProcessor = mockProcessorSupplier.theCapturedProcessor();
                for (int i = 0; i < 2; i++) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(iArr[i]), "A" + iArr[i]));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:A0+null (ts: 0)", "1:A1+null (ts: 0)");
                for (int i2 = 0; i2 < 2; i2++) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(iArr[i2]), "a" + iArr[i2]));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:A0+a0 (ts: 0)", "1:A1+a1 (ts: 0)");
                for (int i3 = 0; i3 < 3; i3++) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(iArr[i3]), "B" + iArr[i3]));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:B0+a0 (ts: 0)", "1:B1+a1 (ts: 0)", "2:B2+null (ts: 0)");
                for (int i4 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i4), "b" + i4));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:A0+b0 (ts: 0)", "0:B0+b0 (ts: 0)", "1:A1+b1 (ts: 0)", "1:B1+b1 (ts: 0)", "2:B2+b2 (ts: 0)");
                for (int i5 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i5), "C" + i5));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:C0+a0 (ts: 0)", "0:C0+b0 (ts: 0)", "1:C1+a1 (ts: 0)", "1:C1+b1 (ts: 0)", "2:C2+b2 (ts: 0)", "3:C3+b3 (ts: 0)");
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testWindowing() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        int[] iArr = {0, 1, 2, 3};
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        streamsBuilder.stream("topic1", this.consumed).leftJoin(streamsBuilder.stream("topic2", this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockProcessorSupplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).copartitionGroups();
        Assert.assertEquals(1L, copartitionGroups.size());
        Assert.assertEquals(new HashSet(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            MockProcessor<Integer, String> theCapturedProcessor = mockProcessorSupplier.theCapturedProcessor();
            for (int i = 0; i < 2; i++) {
                topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(iArr[i]), "A" + iArr[i], 0L));
            }
            theCapturedProcessor.checkAndClearProcessResult("0:A0+null (ts: 0)", "1:A1+null (ts: 0)");
            for (int i2 : iArr) {
                topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i2), "a" + i2, 0L));
            }
            theCapturedProcessor.checkAndClearProcessResult("0:A0+a0 (ts: 0)", "1:A1+a1 (ts: 0)");
            testUpperWindowBound(iArr, topologyTestDriver, theCapturedProcessor);
            testLowerWindowBound(iArr, topologyTestDriver, theCapturedProcessor);
            if (topologyTestDriver != null) {
                if (0 == 0) {
                    topologyTestDriver.close();
                    return;
                }
                try {
                    topologyTestDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    private void testUpperWindowBound(int[] iArr, TopologyTestDriver topologyTestDriver, MockProcessor<Integer, String> mockProcessor) {
        for (int i = 0; i < iArr.length; i++) {
            topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(iArr[i]), "b" + iArr[i], 1000 + i));
        }
        mockProcessor.checkAndClearProcessResult(EMPTY);
        for (int i2 : iArr) {
            topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i2), "B" + i2, 1100L));
        }
        mockProcessor.checkAndClearProcessResult("0:B0+b0 (ts: 1100)", "1:B1+b1 (ts: 1100)", "2:B2+b2 (ts: 1100)", "3:B3+b3 (ts: 1100)");
        long j = 1100 + 1;
        for (int i3 : iArr) {
            topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i3), "C" + i3, j));
        }
        mockProcessor.checkAndClearProcessResult("0:C0+null (ts: 1101)", "1:C1+b1 (ts: 1101)", "2:C2+b2 (ts: 1101)", "3:C3+b3 (ts: 1101)");
        long j2 = j + 1;
        for (int i4 : iArr) {
            topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i4), "D" + i4, j2));
        }
        mockProcessor.checkAndClearProcessResult("0:D0+null (ts: 1102)", "1:D1+null (ts: 1102)", "2:D2+b2 (ts: 1102)", "3:D3+b3 (ts: 1102)");
        long j3 = j2 + 1;
        for (int i5 : iArr) {
            topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i5), "E" + i5, j3));
        }
        mockProcessor.checkAndClearProcessResult("0:E0+null (ts: 1103)", "1:E1+null (ts: 1103)", "2:E2+null (ts: 1103)", "3:E3+b3 (ts: 1103)");
        long j4 = j3 + 1;
        for (int i6 : iArr) {
            topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i6), "F" + i6, j4));
        }
        mockProcessor.checkAndClearProcessResult("0:F0+null (ts: 1104)", "1:F1+null (ts: 1104)", "2:F2+null (ts: 1104)", "3:F3+null (ts: 1104)");
    }

    private void testLowerWindowBound(int[] iArr, TopologyTestDriver topologyTestDriver, MockProcessor<Integer, String> mockProcessor) {
        for (int i : iArr) {
            topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i), "G" + i, 899L));
        }
        mockProcessor.checkAndClearProcessResult("0:G0+null (ts: 899)", "1:G1+null (ts: 899)", "2:G2+null (ts: 899)", "3:G3+null (ts: 899)");
        long j = 899 + 1;
        for (int i2 : iArr) {
            topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i2), "H" + i2, j));
        }
        mockProcessor.checkAndClearProcessResult("0:H0+b0 (ts: 1000)", "1:H1+null (ts: 900)", "2:H2+null (ts: 900)", "3:H3+null (ts: 900)");
        long j2 = j + 1;
        for (int i3 : iArr) {
            topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i3), "I" + i3, j2));
        }
        mockProcessor.checkAndClearProcessResult("0:I0+b0 (ts: 1000)", "1:I1+b1 (ts: 1001)", "2:I2+null (ts: 901)", "3:I3+null (ts: 901)");
        long j3 = j2 + 1;
        for (int i4 : iArr) {
            topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i4), "J" + i4, j3));
        }
        mockProcessor.checkAndClearProcessResult("0:J0+b0 (ts: 1000)", "1:J1+b1 (ts: 1001)", "2:J2+b2 (ts: 1002)", "3:J3+null (ts: 902)");
        long j4 = j3 + 1;
        for (int i5 : iArr) {
            topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i5), "K" + i5, j4));
        }
        mockProcessor.checkAndClearProcessResult("0:K0+b0 (ts: 1000)", "1:K1+b1 (ts: 1001)", "2:K2+b2 (ts: 1002)", "3:K3+b3 (ts: 1003)");
    }
}
