/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.io.disk.InputViewIterator;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class GenericWriteAheadSink<IN>
extends AbstractStreamOperator<IN>
implements OneInputStreamOperator<IN, IN> {
    private static final long serialVersionUID = 1L;
    protected static final Logger LOG = LoggerFactory.getLogger(GenericWriteAheadSink.class);
    private final CheckpointCommitter committer;
    private transient AbstractStateBackend.CheckpointStateOutputView out;
    protected final TypeSerializer<IN> serializer;
    private final String id;
    private ExactlyOnceState state = new ExactlyOnceState();

    public GenericWriteAheadSink(CheckpointCommitter committer, TypeSerializer<IN> serializer, String jobID) throws Exception {
        this.committer = committer;
        this.serializer = serializer;
        this.id = UUID.randomUUID().toString();
        this.committer.setJobId(jobID);
        this.committer.createResource();
    }

    @Override
    public void open() throws Exception {
        this.committer.setOperatorId(this.id);
        this.committer.setOperatorSubtaskId(this.getRuntimeContext().getIndexOfThisSubtask());
        this.committer.open();
        this.cleanState();
    }

    @Override
    public void close() throws Exception {
        this.committer.close();
    }

    private void saveHandleInState(long checkpointId, long timestamp) throws Exception {
        if (this.out != null) {
            StateHandle handle;
            try {
                handle = this.out.closeAndGetHandle();
            }
            catch (Exception e) {
                throw new Exception("Could not close and get state handle from checkpoint state output view belonging to " + this.getOperatorName() + '.', e);
            }
            if (this.state.pendingHandles.containsKey(checkpointId)) {
                try {
                    handle.discardState();
                }
                catch (Exception exception) {
                    LOG.warn("Could not discard state handle for checkpoint {} of {}, which already has been stored.", new Object[]{checkpointId, this.getOperatorName(), exception});
                }
            } else {
                this.state.pendingHandles.put(checkpointId, (Tuple2<Long, StateHandle<DataInputView>>)new Tuple2((Object)timestamp, (Object)handle));
            }
            this.out = null;
        }
    }

    @Override
    public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
        StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
        try {
            this.saveHandleInState(checkpointId, timestamp);
        }
        catch (Exception e) {
            try {
                taskState.discardState();
            }
            catch (Exception discardException) {
                LOG.warn("Could not discard stream task state of {}.", (Object)this.getOperatorName(), (Object)discardException);
            }
            throw new Exception("Could not save handle in state of " + this.getOperatorName() + '.', e);
        }
        taskState.setFunctionState(this.state);
        return taskState;
    }

    @Override
    public void restoreState(StreamTaskState state) throws Exception {
        super.restoreState(state);
        this.state = (ExactlyOnceState)state.getFunctionState();
        this.out = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanState() throws Exception {
        TreeMap<Long, Tuple2<Long, StateHandle<DataInputView>>> treeMap = this.state.pendingHandles;
        synchronized (treeMap) {
            Set<Long> pastCheckpointIds = this.state.pendingHandles.keySet();
            HashSet<Long> checkpointsToRemove = new HashSet<Long>();
            for (Long pastCheckpointId : pastCheckpointIds) {
                if (!this.committer.isCheckpointCommitted(pastCheckpointId)) continue;
                checkpointsToRemove.add(pastCheckpointId);
            }
            for (Long toRemove : checkpointsToRemove) {
                this.state.pendingHandles.remove(toRemove);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
        super.notifyOfCompletedCheckpoint(checkpointId);
        TreeMap<Long, Tuple2<Long, StateHandle<DataInputView>>> treeMap = this.state.pendingHandles;
        synchronized (treeMap) {
            Tuple2<Long, StateHandle<DataInputView>> handle;
            Set<Long> pastCheckpointIds = this.state.pendingHandles.keySet();
            HashSet<Long> checkpointsToRemove = new HashSet<Long>();
            for (Long pastCheckpointId : pastCheckpointIds) {
                if (pastCheckpointId > checkpointId) continue;
                try {
                    if (!this.committer.isCheckpointCommitted(pastCheckpointId)) {
                        handle = this.state.pendingHandles.get(pastCheckpointId);
                        DataInputView in = (DataInputView)((StateHandle)handle.f1).getState(this.getUserCodeClassloader());
                        boolean success = this.sendValues((Iterable<IN>)new ReusingMutableToRegularIteratorWrapper((MutableObjectIterator)new InputViewIterator(in, this.serializer), this.serializer), (Long)handle.f0);
                        if (!success) continue;
                        this.committer.commitCheckpoint(pastCheckpointId);
                        checkpointsToRemove.add(pastCheckpointId);
                        continue;
                    }
                    checkpointsToRemove.add(pastCheckpointId);
                }
                catch (Exception e) {
                    LOG.error("Could not commit checkpoint.", (Throwable)e);
                    break;
                }
            }
            for (Long toRemove : checkpointsToRemove) {
                handle = this.state.pendingHandles.get(toRemove);
                this.state.pendingHandles.remove(toRemove);
                ((StateHandle)handle.f1).discardState();
            }
        }
    }

    protected abstract boolean sendValues(Iterable<IN> var1, long var2) throws Exception;

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        IN value = element.getValue();
        if (this.out == null) {
            this.out = this.getStateBackend().createCheckpointStateOutputView(0L, 0L);
        }
        this.serializer.serialize(value, (DataOutputView)this.out);
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
    }

    public static class ExactlyOnceState
    implements StateHandle<Serializable> {
        private static final long serialVersionUID = -3571063495273460743L;
        protected TreeMap<Long, Tuple2<Long, StateHandle<DataInputView>>> pendingHandles = new TreeMap();

        public TreeMap<Long, Tuple2<Long, StateHandle<DataInputView>>> getState(ClassLoader userCodeClassLoader) throws Exception {
            return this.pendingHandles;
        }

        public void discardState() throws Exception {
        }

        public long getStateSize() throws Exception {
            int stateSize = 0;
            for (Tuple2<Long, StateHandle<DataInputView>> pair : this.pendingHandles.values()) {
                stateSize = (int)((long)stateSize + ((StateHandle)pair.f1).getStateSize());
            }
            return stateSize;
        }

        public void close() throws IOException {
            Throwable exception = null;
            for (Tuple2<Long, StateHandle<DataInputView>> pair : this.pendingHandles.values()) {
                StateHandle handle = (StateHandle)pair.f1;
                if (handle == null) continue;
                try {
                    handle.close();
                }
                catch (Throwable t) {
                    if (exception == null) continue;
                    exception = t;
                }
            }
            if (exception != null) {
                ExceptionUtils.rethrowIOException(exception);
            }
        }

        public String toString() {
            return this.pendingHandles.toString();
        }
    }
}

