/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing.buffers;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.Collections;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

@Internal
public class FoldingWindowBuffer<T, ACC>
implements WindowBuffer<T, ACC> {
    private final FoldFunction<T, ACC> foldFunction;
    private final TypeSerializer<ACC> accSerializer;
    private StreamRecord<ACC> data;

    protected FoldingWindowBuffer(FoldFunction<T, ACC> foldFunction, ACC initialAccumulator, TypeSerializer<ACC> accSerializer) {
        this.foldFunction = foldFunction;
        this.accSerializer = accSerializer;
        this.data = new StreamRecord<ACC>(initialAccumulator);
    }

    protected FoldingWindowBuffer(FoldFunction<T, ACC> foldFunction, StreamRecord<ACC> initialAccumulator, TypeSerializer<ACC> accSerializer) {
        this.foldFunction = foldFunction;
        this.accSerializer = accSerializer;
        this.data = initialAccumulator;
    }

    @Override
    public void storeElement(StreamRecord<T> element) throws Exception {
        this.data.replace(this.foldFunction.fold(this.data.getValue(), element.getValue()), element.getTimestamp());
    }

    @Override
    public Iterable<StreamRecord<ACC>> getElements() {
        return Collections.singleton(this.data);
    }

    @Override
    public Iterable<ACC> getUnpackedElements() {
        return Collections.singleton(this.data.getValue());
    }

    @Override
    public int size() {
        return 1;
    }

    @Override
    public void snapshot(DataOutputView out) throws IOException {
        MultiplexingStreamRecordSerializer<ACC> recordSerializer = new MultiplexingStreamRecordSerializer<ACC>(this.accSerializer);
        recordSerializer.serialize(this.data, out);
    }

    public static class Factory<T, ACC>
    implements WindowBufferFactory<T, ACC, FoldingWindowBuffer<T, ACC>> {
        private static final long serialVersionUID = 1L;
        private final FoldFunction<T, ACC> foldFunction;
        private final TypeSerializer<ACC> accSerializer;
        private transient ACC initialAccumulator;

        public Factory(FoldFunction<T, ACC> foldFunction, ACC initialValue, TypeSerializer<ACC> accSerializer) {
            this.foldFunction = foldFunction;
            this.accSerializer = accSerializer;
            this.initialAccumulator = initialValue;
        }

        @Override
        public FoldingWindowBuffer<T, ACC> create() {
            return new FoldingWindowBuffer<T, Object>(this.foldFunction, this.accSerializer.copy(this.initialAccumulator), this.accSerializer);
        }

        @Override
        public FoldingWindowBuffer<T, ACC> restoreFromSnapshot(DataInputView in) throws IOException {
            MultiplexingStreamRecordSerializer<ACC> recordSerializer = new MultiplexingStreamRecordSerializer<ACC>(this.accSerializer);
            StreamElement element = recordSerializer.deserialize(in);
            return new FoldingWindowBuffer<T, ACC>(this.foldFunction, element.asRecord(), this.accSerializer);
        }

        private void writeObject(ObjectOutputStream out) throws IOException {
            byte[] serializedDefaultValue;
            out.defaultWriteObject();
            try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
                 DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper((OutputStream)baos);){
                this.accSerializer.serialize(this.initialAccumulator, (DataOutputView)outView);
                outView.flush();
                serializedDefaultValue = baos.toByteArray();
            }
            catch (Exception e) {
                throw new IOException("Unable to serialize initial accumulator of type " + this.initialAccumulator.getClass().getSimpleName() + ".", e);
            }
            out.writeInt(serializedDefaultValue.length);
            out.write(serializedDefaultValue);
        }

        private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
            in.defaultReadObject();
            int size = in.readInt();
            byte[] buffer = new byte[size];
            int bytesRead = in.read(buffer);
            if (bytesRead != size) {
                throw new RuntimeException("Read size does not match expected size.");
            }
            try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
                 DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper((InputStream)bais);){
                this.initialAccumulator = this.accSerializer.deserialize((DataInputView)inView);
            }
            catch (Exception e) {
                throw new IOException("Unable to deserialize initial accumulator.", e);
            }
        }
    }
}

