/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.collect.utils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.streaming.api.operators.collect.CollectCoordinationRequest;
import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
import org.apache.flink.streaming.api.operators.collect.CollectSinkFunction;
import org.apache.flink.util.OptionalFailure;
import org.junit.Assert;

public class TestCoordinationRequestHandler<T>
implements CoordinationRequestHandler {
    private static final int BATCH_SIZE = 3;
    private final TypeSerializer<T> serializer;
    private final String accumulatorName;
    private int checkpointCountDown;
    private LinkedList<T> data;
    private List<T> checkpointingData;
    private List<T> checkpointedData;
    private LinkedList<T> buffered;
    private List<T> checkpointingBuffered;
    private List<T> checkpointedBuffered;
    private String version;
    private long offset;
    private long checkpointingOffset;
    private long checkpointedOffset;
    private Map<String, OptionalFailure<Object>> accumulatorResults;
    private Random random;
    private boolean closed;

    public TestCoordinationRequestHandler(List<T> data, TypeSerializer<T> serializer, String accumulatorName) {
        this.serializer = serializer;
        this.accumulatorName = accumulatorName;
        this.checkpointCountDown = 0;
        this.data = new LinkedList<T>(data);
        this.checkpointedData = new ArrayList<T>(data);
        this.buffered = new LinkedList();
        this.checkpointedBuffered = new ArrayList<T>();
        this.version = UUID.randomUUID().toString();
        this.offset = 0L;
        this.checkpointingOffset = 0L;
        this.checkpointedOffset = 0L;
        this.accumulatorResults = new HashMap<String, OptionalFailure<Object>>();
        this.random = new Random();
        this.closed = false;
    }

    public CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest request) {
        CollectCoordinationResponse response;
        if (this.closed) {
            throw new RuntimeException("Handler closed");
        }
        Assert.assertTrue((boolean)(request instanceof CollectCoordinationRequest));
        CollectCoordinationRequest collectRequest = (CollectCoordinationRequest)request;
        for (int i = this.random.nextInt(3) + 1; i > 0; --i) {
            int r;
            if (this.checkpointCountDown > 0) {
                --this.checkpointCountDown;
                if (this.checkpointCountDown == 0) {
                    this.checkpointedData = this.checkpointingData;
                    this.checkpointedBuffered = this.checkpointingBuffered;
                    this.checkpointedOffset = this.checkpointingOffset;
                }
            }
            if ((r = this.random.nextInt(10)) < 6) {
                int size = Math.min(this.data.size(), 6 - this.buffered.size());
                if (size > 0) {
                    size = this.random.nextInt(size) + 1;
                }
                for (int j = 0; j < size; ++j) {
                    this.buffered.add(this.data.removeFirst());
                }
                if (!this.data.isEmpty()) continue;
                this.buildAccumulatorResults();
                this.closed = true;
                break;
            }
            if (r < 9) {
                if (this.checkpointCountDown != 0) continue;
                this.checkpointCountDown = this.random.nextInt(5) + 1;
                this.checkpointingData = new ArrayList<T>(this.data);
                this.checkpointingBuffered = new ArrayList<T>(this.buffered);
                this.checkpointingOffset = this.offset;
                continue;
            }
            this.checkpointCountDown = 0;
            this.version = UUID.randomUUID().toString();
            this.data = new LinkedList<T>(this.checkpointedData);
            this.buffered = new LinkedList<T>(this.checkpointedBuffered);
            this.offset = this.checkpointedOffset;
        }
        Assert.assertTrue((this.offset <= collectRequest.getOffset() ? 1 : 0) != 0);
        List subList = Collections.emptyList();
        if (collectRequest.getVersion().equals(this.version)) {
            while (this.buffered.size() > 0 && collectRequest.getOffset() > this.offset) {
                this.buffered.removeFirst();
                ++this.offset;
            }
            subList = new ArrayList();
            Iterator iterator = this.buffered.iterator();
            for (int i = 0; i < 3 && iterator.hasNext(); ++i) {
                subList.add(iterator.next());
            }
        }
        try {
            response = this.random.nextBoolean() ? new CollectCoordinationResponse(this.version, this.checkpointedOffset, subList, this.serializer) : new CollectCoordinationResponse(collectRequest.getVersion(), -1L, Collections.emptyList(), this.serializer);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        return CompletableFuture.completedFuture(response);
    }

    public boolean isClosed() {
        return this.closed;
    }

    public Map<String, OptionalFailure<Object>> getAccumulatorResults() {
        return this.accumulatorResults;
    }

    private void buildAccumulatorResults() {
        ArrayList<T> finalResults = new ArrayList<T>(this.buffered);
        SerializedListAccumulator listAccumulator = new SerializedListAccumulator();
        try {
            byte[] serializedResult = CollectSinkFunction.serializeAccumulatorResult((long)this.offset, (String)this.version, (long)this.checkpointedOffset, finalResults, this.serializer);
            listAccumulator.add((Object)serializedResult, (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        this.accumulatorResults.put(this.accumulatorName, (OptionalFailure<Object>)OptionalFailure.of((Object)listAccumulator.getLocalValue()));
    }
}

