/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.client.gateway.local.result;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.client.gateway.local.result.ChangelogResult;
import org.apache.flink.table.client.gateway.local.result.CollectStreamResult;
import org.apache.flink.types.Row;

public class ChangelogCollectStreamResult<C>
extends CollectStreamResult<C>
implements ChangelogResult<C> {
    private List<Tuple2<Boolean, Row>> changeRecordBuffer = new ArrayList<Tuple2<Boolean, Row>>();
    private static final int CHANGE_RECORD_BUFFER_SIZE = 5000;

    public ChangelogCollectStreamResult(TableSchema tableSchema, ExecutionConfig config, InetAddress gatewayAddress, int gatewayPort, ClassLoader classLoader) {
        super(tableSchema, config, gatewayAddress, gatewayPort, classLoader);
    }

    @Override
    public boolean isMaterialized() {
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TypedResult<List<Tuple2<Boolean, Row>>> retrieveChanges() {
        Object object = this.resultLock;
        synchronized (object) {
            if (this.isRetrieving() && this.executionException.get() == null) {
                if (this.changeRecordBuffer.isEmpty()) {
                    return TypedResult.empty();
                }
                ArrayList<Tuple2<Boolean, Row>> change = new ArrayList<Tuple2<Boolean, Row>>(this.changeRecordBuffer);
                this.changeRecordBuffer.clear();
                this.resultLock.notify();
                return TypedResult.payload(change);
            }
            if (!this.isRetrieving() && !this.changeRecordBuffer.isEmpty()) {
                ArrayList<Tuple2<Boolean, Row>> change = new ArrayList<Tuple2<Boolean, Row>>(this.changeRecordBuffer);
                this.changeRecordBuffer.clear();
                return TypedResult.payload(change);
            }
            return this.handleMissingResult();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void processRecord(Tuple2<Boolean, Row> change) {
        Object object = this.resultLock;
        synchronized (object) {
            if (this.changeRecordBuffer.size() >= 5000) {
                try {
                    this.resultLock.wait();
                }
                catch (InterruptedException interruptedException) {}
            } else {
                this.changeRecordBuffer.add(change);
            }
        }
    }
}

