/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.window.assigners;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import org.apache.commons.math3.util.ArithmeticUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.assigners.InternalTimeWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.PanedWindowAssigner;
import org.apache.flink.util.IterableIterator;
import org.apache.flink.util.MathUtils;

public class SlidingWindowAssigner
extends PanedWindowAssigner<TimeWindow>
implements InternalTimeWindowAssigner {
    private static final long serialVersionUID = 4895551155814656518L;
    private final long size;
    private final long slide;
    private final long offset;
    private final long paneSize;
    private final int numPanesPerWindow;
    private final boolean isEventTime;

    protected SlidingWindowAssigner(long size, long slide, long offset, boolean isEventTime) {
        if (size <= 0L || slide <= 0L) {
            throw new IllegalArgumentException("SlidingWindowAssigner parameters must satisfy slide > 0 and size > 0");
        }
        this.size = size;
        this.slide = slide;
        this.offset = offset;
        this.isEventTime = isEventTime;
        this.paneSize = ArithmeticUtils.gcd((long)size, (long)slide);
        this.numPanesPerWindow = MathUtils.checkedDownCast((long)(size / this.paneSize));
    }

    @Override
    public Collection<TimeWindow> assignWindows(BaseRow element, long timestamp) {
        long lastStart;
        ArrayList<TimeWindow> windows = new ArrayList<TimeWindow>((int)(this.size / this.slide));
        for (long start = lastStart = TimeWindow.getWindowStartWithOffset(timestamp, this.offset, this.slide); start > timestamp - this.size; start -= this.slide) {
            windows.add(new TimeWindow(start, start + this.size));
        }
        return windows;
    }

    @Override
    public TimeWindow assignPane(Object element, long timestamp) {
        long start = TimeWindow.getWindowStartWithOffset(timestamp, this.offset, this.paneSize);
        return new TimeWindow(start, start + this.paneSize);
    }

    @Override
    public Iterable<TimeWindow> splitIntoPanes(TimeWindow window) {
        return new PanesIterable(window.getStart(), this.paneSize, this.numPanesPerWindow);
    }

    @Override
    public TimeWindow getLastWindow(TimeWindow pane) {
        long lastStart = TimeWindow.getWindowStartWithOffset(pane.getStart(), this.offset, this.slide);
        return new TimeWindow(lastStart, lastStart + this.size);
    }

    @Override
    public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }

    @Override
    public boolean isEventTime() {
        return this.isEventTime;
    }

    @Override
    public String toString() {
        return "SlidingWindow(" + this.size + ", " + this.slide + ")";
    }

    public static SlidingWindowAssigner of(Duration size, Duration slide) {
        return new SlidingWindowAssigner(size.toMillis(), slide.toMillis(), 0L, true);
    }

    public SlidingWindowAssigner withOffset(Duration offset) {
        return new SlidingWindowAssigner(this.size, this.slide, offset.toMillis(), this.isEventTime);
    }

    @Override
    public SlidingWindowAssigner withEventTime() {
        return new SlidingWindowAssigner(this.size, this.slide, this.offset, true);
    }

    @Override
    public SlidingWindowAssigner withProcessingTime() {
        return new SlidingWindowAssigner(this.size, this.slide, this.offset, false);
    }

    private static class PanesIterable
    implements IterableIterator<TimeWindow> {
        private final long paneSize;
        private long paneStart;
        private int numPanesRemaining;

        PanesIterable(long paneStart, long paneSize, int numPanesPerWindow) {
            this.paneStart = paneStart;
            this.paneSize = paneSize;
            this.numPanesRemaining = numPanesPerWindow;
        }

        public boolean hasNext() {
            return this.numPanesRemaining > 0;
        }

        public TimeWindow next() {
            TimeWindow window = new TimeWindow(this.paneStart, this.paneStart + this.paneSize);
            --this.numPanesRemaining;
            this.paneStart += this.paneSize;
            return window;
        }

        public Iterator<TimeWindow> iterator() {
            return this;
        }
    }
}

