package org.apache.flink.test.streaming.runtime;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.assertj.core.api.Assertions;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/test/streaming/runtime/RecordAttributesPropagationITCase.class */
public class RecordAttributesPropagationITCase {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/RecordAttributesPropagationITCase$MockSplit.class */
    public static class MockSplit implements SourceSplit, Serializable {
        MockSplit() {
        }

        public String splitId() {
            return "0";
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/RecordAttributesPropagationITCase$OneInputOperator.class */
    static class OneInputOperator extends AbstractStreamOperator<Long> implements OneInputStreamOperator<Long, Long> {
        private static final List<RecordAttributes> receivedRecordAttributes = new ArrayList();

        OneInputOperator() {
        }

        public void processElement(StreamRecord<Long> streamRecord) throws Exception {
            this.output.collect(streamRecord);
        }

        public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
            receivedRecordAttributes.add(recordAttributes);
            super.processRecordAttributes(recordAttributes);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/RecordAttributesPropagationITCase$SourceWithBacklog.class */
    static class SourceWithBacklog implements Source<Long, MockSplit, Long> {
        SourceWithBacklog() {
        }

        public Boundedness getBoundedness() {
            return Boundedness.BOUNDED;
        }

        public SplitEnumerator<MockSplit, Long> createEnumerator(SplitEnumeratorContext<MockSplit> splitEnumeratorContext) {
            return new SplitEnumeratorWithBacklog(splitEnumeratorContext);
        }

        public SplitEnumerator<MockSplit, Long> restoreEnumerator(SplitEnumeratorContext<MockSplit> splitEnumeratorContext, Long l) {
            return new SplitEnumeratorWithBacklog(splitEnumeratorContext);
        }

        public SimpleVersionedSerializer<MockSplit> getSplitSerializer() {
            return new SimpleVersionedSerializer<MockSplit>() { // from class: org.apache.flink.test.streaming.runtime.RecordAttributesPropagationITCase.SourceWithBacklog.1
                public int getVersion() {
                    return 0;
                }

                public byte[] serialize(MockSplit mockSplit) {
                    return new byte[0];
                }

                /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
                public MockSplit m976deserialize(int i, byte[] bArr) {
                    return new MockSplit();
                }
            };
        }

        public SimpleVersionedSerializer<Long> getEnumeratorCheckpointSerializer() {
            return new SimpleVersionedSerializer<Long>() { // from class: org.apache.flink.test.streaming.runtime.RecordAttributesPropagationITCase.SourceWithBacklog.2
                public int getVersion() {
                    return 0;
                }

                public byte[] serialize(Long l) {
                    return new byte[0];
                }

                /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
                public Long m977deserialize(int i, byte[] bArr) {
                    return 0L;
                }
            };
        }

        public SourceReader<Long, MockSplit> createReader(SourceReaderContext sourceReaderContext) {
            return new SourceReader<Long, MockSplit>() { // from class: org.apache.flink.test.streaming.runtime.RecordAttributesPropagationITCase.SourceWithBacklog.3
                private boolean noMoreSplit;

                public void start() {
                }

                public InputStatus pollNext(ReaderOutput<Long> readerOutput) {
                    return this.noMoreSplit ? InputStatus.END_OF_INPUT : InputStatus.MORE_AVAILABLE;
                }

                public List<MockSplit> snapshotState(long j) {
                    return null;
                }

                public CompletableFuture<Void> isAvailable() {
                    return null;
                }

                public void addSplits(List<MockSplit> list) {
                }

                public void notifyNoMoreSplits() {
                    this.noMoreSplit = true;
                }

                public void close() {
                }
            };
        }

        public /* bridge */ /* synthetic */ SplitEnumerator restoreEnumerator(SplitEnumeratorContext splitEnumeratorContext, Object obj) throws Exception {
            return restoreEnumerator((SplitEnumeratorContext<MockSplit>) splitEnumeratorContext, (Long) obj);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/RecordAttributesPropagationITCase$SplitEnumeratorWithBacklog.class */
    public static class SplitEnumeratorWithBacklog implements SplitEnumerator<MockSplit, Long> {
        private final SplitEnumeratorContext<MockSplit> context;
        private final ExecutorService executor = Executors.newSingleThreadExecutor();

        SplitEnumeratorWithBacklog(SplitEnumeratorContext<MockSplit> splitEnumeratorContext) {
            this.context = splitEnumeratorContext;
        }

        public void start() {
            this.executor.submit(() -> {
                this.context.setIsProcessingBacklog(true);
                try {
                    Thread.sleep(3000L);
                    this.context.setIsProcessingBacklog(false);
                    for (int i = 0; i < this.context.currentParallelism(); i++) {
                        this.context.signalNoMoreSplits(i);
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
        }

        public void handleSplitRequest(int i, @Nullable String str) {
        }

        public void addSplitsBack(List<MockSplit> list, int i) {
        }

        public void addReader(int i) {
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Long m978snapshotState(long j) throws Exception {
            return null;
        }

        public void close() throws IOException {
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/RecordAttributesPropagationITCase$TwoInputOperator.class */
    static class TwoInputOperator extends AbstractStreamOperator<Long> implements TwoInputStreamOperator<Long, Long, Long> {
        private static final List<RecordAttributes> receivedRecordAttributes1 = new ArrayList();
        private static final List<RecordAttributes> receivedRecordAttributes2 = new ArrayList();

        TwoInputOperator() {
        }

        public void processRecordAttributes1(RecordAttributes recordAttributes) {
            receivedRecordAttributes1.add(recordAttributes);
            super.processRecordAttributes1(recordAttributes);
        }

        public void processRecordAttributes2(RecordAttributes recordAttributes) {
            receivedRecordAttributes2.add(recordAttributes);
            super.processRecordAttributes2(recordAttributes);
        }

        public void processElement1(StreamRecord<Long> streamRecord) {
            this.output.collect(streamRecord);
        }

        public void processElement2(StreamRecord<Long> streamRecord) {
            this.output.collect(streamRecord);
        }
    }

    @Test
    void testRecordAttributesPropagation() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.fromSource(new SourceWithBacklog(), WatermarkStrategy.noWatermarks(), "source1").returns(Long.class).transform("my_op1", Types.LONG, new OneInputOperator()).connect(executionEnvironment.fromSource(new SourceWithBacklog(), WatermarkStrategy.noWatermarks(), "source2").returns(Long.class)).transform("my_op2", Types.LONG, new TwoInputOperator()).addSink(new DiscardingSink());
        executionEnvironment.execute();
        RecordAttributes build = new RecordAttributesBuilder(Collections.emptyList()).setBacklog(true).build();
        RecordAttributes build2 = new RecordAttributesBuilder(Collections.emptyList()).setBacklog(false).build();
        Assertions.assertThat(OneInputOperator.receivedRecordAttributes).containsExactly(new RecordAttributes[]{build, build2});
        Assertions.assertThat(TwoInputOperator.receivedRecordAttributes1).containsExactly(new RecordAttributes[]{build, build2});
        Assertions.assertThat(TwoInputOperator.receivedRecordAttributes2).containsExactly(new RecordAttributes[]{build, build2});
    }
}
