package org.apache.flink.connector.kinesis.source.reader;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.kinesis.source.serialization.KinesisDeserializationSchema;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
import org.apache.flink.connector.kinesis.source.split.StartingPosition;
import org.apache.flink.connector.kinesis.source.util.TestUtil;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.SdkBytes;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.Record;
import org.apache.flink.util.Collector;
import org.assertj.core.api.AssertionsForInterfaceTypes;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitterTest.class */
class KinesisStreamsRecordEmitterTest {
    private static final SimpleStringSchema STRING_SCHEMA = new SimpleStringSchema();

    /* loaded from: input_file:org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitterTest$AssertRecordMetadataDeserializationSchema.class */
    private static class AssertRecordMetadataDeserializationSchema implements KinesisDeserializationSchema<String> {
        private final String expectedStreamArn;
        private final String expectedShardId;

        private AssertRecordMetadataDeserializationSchema(String str, String str2) {
            this.expectedStreamArn = str;
            this.expectedShardId = str2;
        }

        public void deserialize(Record record, String str, String str2, Collector<String> collector) throws IOException {
            AssertionsForInterfaceTypes.assertThat(str).isEqualTo(this.expectedStreamArn);
            AssertionsForInterfaceTypes.assertThat(str2).isEqualTo(this.expectedShardId);
            KinesisStreamsRecordEmitterTest.STRING_SCHEMA.deserialize(record.data().asByteArray(), collector);
        }

        public TypeInformation<String> getProducedType() {
            return Types.STRING;
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitterTest$CapturingSourceOutput.class */
    private static class CapturingSourceOutput<T> implements SourceOutput<T> {
        private final List<T> emittedRecords;
        private final List<Long> emittedTimestamps;

        private CapturingSourceOutput() {
            this.emittedRecords = new ArrayList();
            this.emittedTimestamps = new ArrayList();
        }

        public void collect(T t) {
            this.emittedRecords.add(t);
        }

        public void collect(T t, long j) {
            this.emittedRecords.add(t);
            this.emittedTimestamps.add(Long.valueOf(j));
        }

        public void emitWatermark(Watermark watermark) {
            throw new UnsupportedOperationException();
        }

        public void markIdle() {
            throw new UnsupportedOperationException();
        }

        public void markActive() {
            throw new UnsupportedOperationException();
        }

        public List<T> getEmittedRecords() {
            return this.emittedRecords;
        }

        public List<Long> getEmittedTimestamps() {
            return this.emittedTimestamps;
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitterTest$SequenceNumberBasedDeserializationSchema.class */
    private static class SequenceNumberBasedDeserializationSchema implements KinesisDeserializationSchema<String> {
        private SequenceNumberBasedDeserializationSchema() {
        }

        public void deserialize(Record record, String str, String str2, Collector<String> collector) throws IOException {
            if (Objects.equals(record.sequenceNumber(), "emit")) {
                KinesisStreamsRecordEmitterTest.STRING_SCHEMA.deserialize(record.data().asByteArray(), collector);
            }
        }

        public TypeInformation<String> getProducedType() {
            return Types.STRING;
        }
    }

    KinesisStreamsRecordEmitterTest() {
    }

    @Test
    void testEmitRecord() throws Exception {
        Instant now = Instant.now();
        List list = (List) Stream.of((Object[]) new Record[]{(Record) Record.builder().data(SdkBytes.fromByteArray(STRING_SCHEMA.serialize("data-1"))).approximateArrivalTimestamp(now).build(), (Record) Record.builder().data(SdkBytes.fromByteArray(STRING_SCHEMA.serialize("data-2"))).approximateArrivalTimestamp(now.plusSeconds(10L)).build(), (Record) Record.builder().data(SdkBytes.fromByteArray(STRING_SCHEMA.serialize("data-3"))).approximateArrivalTimestamp(now.plusSeconds(20L)).sequenceNumber("some-sequence-number").build()}).collect(Collectors.toList());
        StartingPosition continueFromSequenceNumber = StartingPosition.continueFromSequenceNumber("some-sequence-number");
        CapturingSourceOutput capturingSourceOutput = new CapturingSourceOutput();
        KinesisShardSplitState testSplitState = TestUtil.getTestSplitState();
        KinesisStreamsRecordEmitter kinesisStreamsRecordEmitter = new KinesisStreamsRecordEmitter(KinesisDeserializationSchema.of(STRING_SCHEMA));
        Iterator it = list.iterator();
        while (it.hasNext()) {
            kinesisStreamsRecordEmitter.emitRecord((Record) it.next(), capturingSourceOutput, testSplitState);
        }
        AssertionsForInterfaceTypes.assertThat(capturingSourceOutput.getEmittedRecords()).containsExactly(new String[]{"data-1", "data-2", "data-3"});
        AssertionsForInterfaceTypes.assertThat(capturingSourceOutput.getEmittedTimestamps()).containsExactly(new Long[]{Long.valueOf(now.toEpochMilli()), Long.valueOf(now.plusSeconds(10L).toEpochMilli()), Long.valueOf(now.plusSeconds(20L).toEpochMilli())});
        AssertionsForInterfaceTypes.assertThat(testSplitState.getNextStartingPosition()).usingRecursiveComparison().isEqualTo(continueFromSequenceNumber);
    }

    @Test
    void testEmitRecordBasedOnSequenceNumber() throws Exception {
        Instant now = Instant.now();
        List list = (List) Stream.of((Object[]) new Record[]{(Record) Record.builder().data(SdkBytes.fromByteArray(STRING_SCHEMA.serialize("data-1"))).sequenceNumber("emit").approximateArrivalTimestamp(now).build(), (Record) Record.builder().data(SdkBytes.fromByteArray(STRING_SCHEMA.serialize("data-2"))).sequenceNumber("emit").approximateArrivalTimestamp(now.plusSeconds(10L)).build(), (Record) Record.builder().data(SdkBytes.fromByteArray(STRING_SCHEMA.serialize("data-3"))).approximateArrivalTimestamp(now.plusSeconds(20L)).sequenceNumber("do-not-emit").build()}).collect(Collectors.toList());
        CapturingSourceOutput capturingSourceOutput = new CapturingSourceOutput();
        KinesisShardSplitState testSplitState = TestUtil.getTestSplitState();
        KinesisStreamsRecordEmitter kinesisStreamsRecordEmitter = new KinesisStreamsRecordEmitter(new SequenceNumberBasedDeserializationSchema());
        Iterator it = list.iterator();
        while (it.hasNext()) {
            kinesisStreamsRecordEmitter.emitRecord((Record) it.next(), capturingSourceOutput, testSplitState);
        }
        AssertionsForInterfaceTypes.assertThat(capturingSourceOutput.getEmittedRecords()).containsExactly(new String[]{"data-1", "data-2"});
        AssertionsForInterfaceTypes.assertThat(capturingSourceOutput.getEmittedTimestamps()).containsExactly(new Long[]{Long.valueOf(now.toEpochMilli()), Long.valueOf(now.plusSeconds(10L).toEpochMilli())});
    }

    @Test
    void testEmitRecordWithMetadata() throws Exception {
        Instant now = Instant.now();
        List list = (List) Stream.of((Object[]) new Record[]{(Record) Record.builder().data(SdkBytes.fromByteArray(STRING_SCHEMA.serialize("data-1"))).approximateArrivalTimestamp(now).build(), (Record) Record.builder().data(SdkBytes.fromByteArray(STRING_SCHEMA.serialize("data-2"))).approximateArrivalTimestamp(now.plusSeconds(10L)).build(), (Record) Record.builder().data(SdkBytes.fromByteArray(STRING_SCHEMA.serialize("data-3"))).approximateArrivalTimestamp(now.plusSeconds(20L)).sequenceNumber("some-sequence-number").build()}).collect(Collectors.toList());
        CapturingSourceOutput capturingSourceOutput = new CapturingSourceOutput();
        KinesisShardSplitState testSplitState = TestUtil.getTestSplitState();
        KinesisStreamsRecordEmitter kinesisStreamsRecordEmitter = new KinesisStreamsRecordEmitter(new AssertRecordMetadataDeserializationSchema(testSplitState.getStreamArn(), testSplitState.getShardId()));
        Iterator it = list.iterator();
        while (it.hasNext()) {
            kinesisStreamsRecordEmitter.emitRecord((Record) it.next(), capturingSourceOutput, testSplitState);
        }
        AssertionsForInterfaceTypes.assertThat(capturingSourceOutput.getEmittedRecords()).containsExactly(new String[]{"data-1", "data-2", "data-3"});
        AssertionsForInterfaceTypes.assertThat(capturingSourceOutput.getEmittedTimestamps()).containsExactly(new Long[]{Long.valueOf(now.toEpochMilli()), Long.valueOf(now.plusSeconds(10L).toEpochMilli()), Long.valueOf(now.plusSeconds(20L).toEpochMilli())});
    }
}
