/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.aggregate.window.buffers;

import java.io.EOFException;
import java.time.ZoneId;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
import org.apache.flink.table.runtime.typeutils.WindowKeySerializer;
import org.apache.flink.table.runtime.util.KeyValueIterator;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.runtime.util.WindowKey;
import org.apache.flink.table.runtime.util.collections.binary.BytesMap;
import org.apache.flink.table.runtime.util.collections.binary.WindowBytesMultiMap;

public final class RecordsWindowBuffer
implements WindowBuffer {
    private final WindowCombineFunction combineFunction;
    private final WindowBytesMultiMap recordsBuffer;
    private final WindowKey reuseWindowKey;
    private final AbstractRowDataSerializer<RowData> recordSerializer;
    private final ZoneId shiftTimeZone;
    private long minSliceEnd = Long.MAX_VALUE;

    public RecordsWindowBuffer(Object operatorOwner, MemoryManager memoryManager, long memorySize, WindowCombineFunction combineFunction, PagedTypeSerializer<RowData> keySer, AbstractRowDataSerializer<RowData> inputSer, ZoneId shiftTimeZone) {
        this.combineFunction = combineFunction;
        this.recordsBuffer = new WindowBytesMultiMap(operatorOwner, memoryManager, memorySize, keySer, inputSer.getArity());
        this.recordSerializer = inputSer;
        this.reuseWindowKey = new WindowKeySerializer(keySer).createInstance();
        this.shiftTimeZone = shiftTimeZone;
    }

    @Override
    public void addElement(RowData key, long sliceEnd, RowData element) throws Exception {
        this.minSliceEnd = Math.min(sliceEnd, this.minSliceEnd);
        this.reuseWindowKey.replace(sliceEnd, key);
        BytesMap.LookupInfo lookup = this.recordsBuffer.lookup(this.reuseWindowKey);
        try {
            this.recordsBuffer.append(lookup, this.recordSerializer.toBinaryRow(element));
        }
        catch (EOFException e) {
            this.flush();
            this.addElement(key, sliceEnd, element);
        }
    }

    @Override
    public void advanceProgress(long progress) throws Exception {
        if (TimeWindowUtil.isWindowFired(this.minSliceEnd, progress, this.shiftTimeZone)) {
            this.flush();
        }
    }

    @Override
    public void flush() throws Exception {
        if (this.recordsBuffer.getNumKeys() > 0L) {
            KeyValueIterator entryIterator = this.recordsBuffer.getEntryIterator();
            while (entryIterator.advanceNext()) {
                this.combineFunction.combine((WindowKey)entryIterator.getKey(), entryIterator.getValue());
            }
            this.recordsBuffer.reset();
            this.minSliceEnd = Long.MAX_VALUE;
        }
    }

    @Override
    public void close() throws Exception {
        this.recordsBuffer.free();
        this.combineFunction.close();
    }

    public static final class Factory
    implements WindowBuffer.Factory {
        private static final long serialVersionUID = 1L;
        private final PagedTypeSerializer<RowData> keySer;
        private final AbstractRowDataSerializer<RowData> inputSer;

        public Factory(PagedTypeSerializer<RowData> keySer, AbstractRowDataSerializer<RowData> inputSer) {
            this.keySer = keySer;
            this.inputSer = inputSer;
        }

        @Override
        public WindowBuffer create(Object operatorOwner, MemoryManager memoryManager, long memorySize, WindowCombineFunction combineFunction, ZoneId shiftTimeZone) {
            return new RecordsWindowBuffer(operatorOwner, memoryManager, memorySize, combineFunction, this.keySer, this.inputSer, shiftTimeZone);
        }
    }
}

