/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.legacy.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class RecordAttributesPropagationITCase {
    @Test
    void testRecordAttributesPropagation() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SourceWithBacklog source1 = new SourceWithBacklog();
        SourceWithBacklog source2 = new SourceWithBacklog();
        env.fromSource((Source)source1, WatermarkStrategy.noWatermarks(), "source1").returns(Long.class).transform("my_op1", Types.LONG, (OneInputStreamOperator)new OneInputOperator()).connect((DataStream)env.fromSource((Source)source2, WatermarkStrategy.noWatermarks(), "source2").returns(Long.class)).transform("my_op2", Types.LONG, (TwoInputStreamOperator)new TwoInputOperator()).addSink((SinkFunction)new DiscardingSink());
        env.execute();
        RecordAttributes backlog = new RecordAttributesBuilder(Collections.emptyList()).setBacklog(true).build();
        RecordAttributes nonBacklog = new RecordAttributesBuilder(Collections.emptyList()).setBacklog(false).build();
        Assertions.assertThat(OneInputOperator.receivedRecordAttributes).containsExactly((Object[])new RecordAttributes[]{backlog, nonBacklog});
        Assertions.assertThat(TwoInputOperator.receivedRecordAttributes1).containsExactly((Object[])new RecordAttributes[]{backlog, nonBacklog});
        Assertions.assertThat(TwoInputOperator.receivedRecordAttributes2).containsExactly((Object[])new RecordAttributes[]{backlog, nonBacklog});
    }

    static class TwoInputOperator
    extends AbstractStreamOperator<Long>
    implements TwoInputStreamOperator<Long, Long, Long> {
        private static final List<RecordAttributes> receivedRecordAttributes1 = new ArrayList<RecordAttributes>();
        private static final List<RecordAttributes> receivedRecordAttributes2 = new ArrayList<RecordAttributes>();

        TwoInputOperator() {
        }

        public void processRecordAttributes1(RecordAttributes recordAttributes) {
            receivedRecordAttributes1.add(recordAttributes);
            super.processRecordAttributes1(recordAttributes);
        }

        public void processRecordAttributes2(RecordAttributes recordAttributes) {
            receivedRecordAttributes2.add(recordAttributes);
            super.processRecordAttributes2(recordAttributes);
        }

        public void processElement1(StreamRecord<Long> element) {
            this.output.collect(element);
        }

        public void processElement2(StreamRecord<Long> element) {
            this.output.collect(element);
        }
    }

    static class OneInputOperator
    extends AbstractStreamOperator<Long>
    implements OneInputStreamOperator<Long, Long> {
        private static final List<RecordAttributes> receivedRecordAttributes = new ArrayList<RecordAttributes>();

        OneInputOperator() {
        }

        public void processElement(StreamRecord<Long> element) throws Exception {
            this.output.collect(element);
        }

        public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
            receivedRecordAttributes.add(recordAttributes);
            super.processRecordAttributes(recordAttributes);
        }
    }

    static class MockSplit
    implements SourceSplit,
    Serializable {
        MockSplit() {
        }

        public String splitId() {
            return "0";
        }
    }

    static class SplitEnumeratorWithBacklog
    implements SplitEnumerator<MockSplit, Long> {
        private final SplitEnumeratorContext<MockSplit> context;
        private final ExecutorService executor;

        SplitEnumeratorWithBacklog(SplitEnumeratorContext<MockSplit> context) {
            this.context = context;
            this.executor = Executors.newSingleThreadExecutor();
        }

        public void start() {
            this.executor.submit(() -> {
                this.context.setIsProcessingBacklog(true);
                try {
                    Thread.sleep(3000L);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                this.context.setIsProcessingBacklog(false);
                for (int i = 0; i < this.context.currentParallelism(); ++i) {
                    this.context.signalNoMoreSplits(i);
                }
            });
        }

        public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
        }

        public void addSplitsBack(List<MockSplit> splits, int subtaskId) {
        }

        public void addReader(int subtaskId) {
        }

        public Long snapshotState(long checkpointId) throws Exception {
            return null;
        }

        public void close() throws IOException {
        }
    }

    static class SourceWithBacklog
    implements Source<Long, MockSplit, Long> {
        SourceWithBacklog() {
        }

        public Boundedness getBoundedness() {
            return Boundedness.BOUNDED;
        }

        public SplitEnumerator<MockSplit, Long> createEnumerator(SplitEnumeratorContext<MockSplit> enumContext) {
            return new SplitEnumeratorWithBacklog(enumContext);
        }

        public SplitEnumerator<MockSplit, Long> restoreEnumerator(SplitEnumeratorContext<MockSplit> enumContext, Long checkpoint) {
            return new SplitEnumeratorWithBacklog(enumContext);
        }

        public SimpleVersionedSerializer<MockSplit> getSplitSerializer() {
            return new SimpleVersionedSerializer<MockSplit>(){

                public int getVersion() {
                    return 0;
                }

                public byte[] serialize(MockSplit obj) {
                    return new byte[0];
                }

                public MockSplit deserialize(int version, byte[] serialized) {
                    return new MockSplit();
                }
            };
        }

        public SimpleVersionedSerializer<Long> getEnumeratorCheckpointSerializer() {
            return new SimpleVersionedSerializer<Long>(){

                public int getVersion() {
                    return 0;
                }

                public byte[] serialize(Long obj) {
                    return new byte[0];
                }

                public Long deserialize(int version, byte[] serialized) {
                    return 0L;
                }
            };
        }

        public SourceReader<Long, MockSplit> createReader(SourceReaderContext readerContext) {
            return new SourceReader<Long, MockSplit>(){
                private boolean noMoreSplit;

                public void start() {
                }

                public InputStatus pollNext(ReaderOutput<Long> output) {
                    if (this.noMoreSplit) {
                        return InputStatus.END_OF_INPUT;
                    }
                    return InputStatus.MORE_AVAILABLE;
                }

                public List<MockSplit> snapshotState(long checkpointId) {
                    return null;
                }

                public CompletableFuture<Void> isAvailable() {
                    return null;
                }

                public void addSplits(List<MockSplit> splits) {
                }

                public void notifyNoMoreSplits() {
                    this.noMoreSplit = true;
                }

                public void close() {
                }
            };
        }
    }
}

