/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing.buffers;

import java.io.IOException;
import java.util.Collections;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

@Internal
public class ReducingWindowBuffer<T>
implements WindowBuffer<T, T> {
    private final ReduceFunction<T> reduceFunction;
    private final TypeSerializer<T> serializer;
    private StreamRecord<T> data;

    protected ReducingWindowBuffer(ReduceFunction<T> reduceFunction, TypeSerializer<T> serializer) {
        this.reduceFunction = reduceFunction;
        this.serializer = serializer;
        this.data = null;
    }

    protected ReducingWindowBuffer(ReduceFunction<T> reduceFunction, StreamRecord<T> data, TypeSerializer<T> serializer) {
        this.reduceFunction = reduceFunction;
        this.serializer = serializer;
        this.data = data;
    }

    @Override
    public void storeElement(StreamRecord<T> element) throws Exception {
        if (this.data == null) {
            this.data = new StreamRecord<T>(element.getValue(), element.getTimestamp());
        } else {
            this.data.replace(this.reduceFunction.reduce(this.data.getValue(), element.getValue()));
        }
    }

    @Override
    public Iterable<StreamRecord<T>> getElements() {
        return Collections.singleton(this.data);
    }

    @Override
    public Iterable<T> getUnpackedElements() {
        return Collections.singleton(this.data.getValue());
    }

    @Override
    public int size() {
        return 1;
    }

    @Override
    public void snapshot(DataOutputView out) throws IOException {
        if (this.data != null) {
            out.writeBoolean(true);
            MultiplexingStreamRecordSerializer<T> recordSerializer = new MultiplexingStreamRecordSerializer<T>(this.serializer);
            recordSerializer.serialize(this.data, out);
        } else {
            out.writeBoolean(false);
        }
    }

    public static class Factory<T>
    implements WindowBufferFactory<T, T, ReducingWindowBuffer<T>> {
        private static final long serialVersionUID = 1L;
        private final ReduceFunction<T> reduceFunction;
        private final TypeSerializer<T> serializer;

        public Factory(ReduceFunction<T> reduceFunction, TypeSerializer<T> serializer) {
            this.reduceFunction = reduceFunction;
            this.serializer = serializer;
        }

        @Override
        public ReducingWindowBuffer<T> create() {
            return new ReducingWindowBuffer<T>(this.reduceFunction, this.serializer);
        }

        @Override
        public ReducingWindowBuffer<T> restoreFromSnapshot(DataInputView in) throws IOException {
            boolean hasValue = in.readBoolean();
            if (hasValue) {
                MultiplexingStreamRecordSerializer<T> recordSerializer = new MultiplexingStreamRecordSerializer<T>(this.serializer);
                StreamElement element = recordSerializer.deserialize(in);
                return new ReducingWindowBuffer<T>(this.reduceFunction, element.asRecord(), this.serializer);
            }
            return new ReducingWindowBuffer<T>(this.reduceFunction, this.serializer);
        }
    }
}

