/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.api.datastream;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SupportsBatchSnapshot;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

class WatermarkAlignmentITCase {
    WatermarkAlignmentITCase() {
    }

    @Test
    void testTaskFinishedWithWatermarkAlignmentExecution() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        SingleOutputStreamOperator stream = env.fromSource((Source)new EagerlyFinishingNumberSequenceSource(0L, 100L), WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(aLong, l) -> aLong).withWatermarkAlignment("g1", Duration.ofMillis(10L), Duration.ofMillis(1L)), "Sequence Source").filter((FilterFunction & Serializable)aLong -> {
            Thread.sleep(10L);
            return true;
        });
        List result = stream.executeAndCollect(101);
        Collections.sort(result);
        Assertions.assertIterableEquals((Iterable)result, (Iterable)LongStream.rangeClosed(0L, 100L).boxed().collect(Collectors.toList()));
    }

    static class EagerlyFinishingIteratorSourceEnumerator
    implements SplitEnumerator<NumberSequenceSource.NumberSequenceSplit, Collection<NumberSequenceSource.NumberSequenceSplit>>,
    SupportsBatchSnapshot {
        private final SplitEnumeratorContext<NumberSequenceSource.NumberSequenceSplit> context;
        private final Queue<NumberSequenceSource.NumberSequenceSplit> remainingSplits;

        public EagerlyFinishingIteratorSourceEnumerator(SplitEnumeratorContext<NumberSequenceSource.NumberSequenceSplit> context, Collection<NumberSequenceSource.NumberSequenceSplit> splits) {
            this.context = (SplitEnumeratorContext)Preconditions.checkNotNull(context);
            this.remainingSplits = new ArrayDeque<NumberSequenceSource.NumberSequenceSplit>(splits);
            this.context.metricGroup().setUnassignedSplitsGauge(() -> this.remainingSplits.size());
        }

        public void start() {
        }

        public void close() {
        }

        public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
            NumberSequenceSource.NumberSequenceSplit nextSplit = this.remainingSplits.poll();
            if (nextSplit != null) {
                this.context.assignSplit((SourceSplit)nextSplit, subtaskId);
            }
            if (this.remainingSplits.size() == 0) {
                for (int i = 0; i < this.context.currentParallelism(); ++i) {
                    this.context.signalNoMoreSplits(i);
                }
            }
        }

        public void addSplitsBack(List<NumberSequenceSource.NumberSequenceSplit> splits, int subtaskId) {
            throw new UnsupportedOperationException();
        }

        public Collection<NumberSequenceSource.NumberSequenceSplit> snapshotState(long checkpointId) throws Exception {
            return this.remainingSplits;
        }

        public void addReader(int subtaskId) {
        }
    }

    static class EagerlyFinishingNumberSequenceSource
    extends NumberSequenceSource {
        public EagerlyFinishingNumberSequenceSource(long from, long to) {
            super(from, to);
        }

        public SplitEnumerator<NumberSequenceSource.NumberSequenceSplit, Collection<NumberSequenceSource.NumberSequenceSplit>> createEnumerator(SplitEnumeratorContext<NumberSequenceSource.NumberSequenceSplit> enumContext) {
            List splits = this.splitNumberRange(this.getFrom(), this.getTo(), enumContext.currentParallelism());
            return new EagerlyFinishingIteratorSourceEnumerator(enumContext, splits);
        }
    }
}

