/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.languagebinding.api.java.common.streaming;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Iterator;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.languagebinding.api.java.common.streaming.Receiver;
import org.apache.flink.languagebinding.api.java.common.streaming.Sender;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class Streamer
implements Serializable {
    protected static final Logger LOG = LoggerFactory.getLogger(Streamer.class);
    private static final int SIGNAL_BUFFER_REQUEST = 0;
    private static final int SIGNAL_BUFFER_REQUEST_G0 = -3;
    private static final int SIGNAL_BUFFER_REQUEST_G1 = -4;
    private static final int SIGNAL_FINISHED = -1;
    private static final int SIGNAL_ERROR = -2;
    private static final byte SIGNAL_LAST = 32;
    private final byte[] buffer = new byte[4];
    protected ServerSocket server;
    protected Socket socket;
    protected InputStream in;
    protected OutputStream out;
    protected int port;
    protected Sender sender;
    protected Receiver receiver;
    protected StringBuilder msg = new StringBuilder();
    protected final AbstractRichFunction function;

    public Streamer(AbstractRichFunction function) {
        this.function = function;
        this.sender = new Sender(function);
        this.receiver = new Receiver(function);
    }

    public void open() throws IOException {
        this.server = new ServerSocket(0);
        this.setupProcess();
    }

    public abstract void setupProcess() throws IOException;

    public void close() throws IOException {
        this.socket.close();
        this.sender.close();
        this.receiver.close();
    }

    private void sendWriteNotification(int size, boolean hasNext) throws IOException {
        byte[] tmp = new byte[5];
        Streamer.putInt(tmp, 0, size);
        tmp[4] = hasNext ? 0 : 32;
        this.out.write(tmp, 0, 5);
        this.out.flush();
    }

    private void sendReadConfirmation() throws IOException {
        this.out.write(new byte[1], 0, 1);
        this.out.flush();
    }

    private void checkForError() {
        if (Streamer.getInt(this.buffer, 0) == -2) {
            try {
                Thread.sleep(2000L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " terminated prematurely." + this.msg);
        }
    }

    public final void sendBroadCastVariables(Configuration config) throws IOException {
        try {
            int broadcastCount = config.getInteger("PLANBINDER_BCVAR_COUNT", 0);
            String[] names = new String[broadcastCount];
            for (int x = 0; x < names.length; ++x) {
                names[x] = config.getString("PLANBINDER_BCVAR_" + x, null);
            }
            this.in.read(this.buffer, 0, 4);
            this.checkForError();
            int size = this.sender.sendRecord(broadcastCount);
            this.sendWriteNotification(size, false);
            for (String name : names) {
                Iterator bcv = this.function.getRuntimeContext().getBroadcastVariable(name).iterator();
                this.in.read(this.buffer, 0, 4);
                this.checkForError();
                size = this.sender.sendRecord(name);
                this.sendWriteNotification(size, false);
                while (bcv.hasNext() || this.sender.hasRemaining(0)) {
                    this.in.read(this.buffer, 0, 4);
                    this.checkForError();
                    size = this.sender.sendBuffer(bcv, 0);
                    this.sendWriteNotification(size, bcv.hasNext() || this.sender.hasRemaining(0));
                }
                this.sender.reset();
            }
        }
        catch (SocketTimeoutException ste) {
            throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " stopped responding." + this.msg);
        }
    }

    public final void streamBufferWithoutGroups(Iterator i, Collector c) throws IOException {
        block11: {
            try {
                if (!i.hasNext()) break block11;
                block9: while (true) {
                    this.in.read(this.buffer, 0, 4);
                    int sig = Streamer.getInt(this.buffer, 0);
                    switch (sig) {
                        case 0: {
                            if (i.hasNext() || this.sender.hasRemaining(0)) {
                                int size = this.sender.sendBuffer(i, 0);
                                this.sendWriteNotification(size, this.sender.hasRemaining(0) || i.hasNext());
                                continue block9;
                            }
                            throw new RuntimeException("External process requested data even though none is available.");
                        }
                        case -1: {
                            return;
                        }
                        case -2: {
                            try {
                                Thread.sleep(2000L);
                            }
                            catch (InterruptedException interruptedException) {
                                // empty catch block
                            }
                            throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + this.msg);
                        }
                    }
                    this.receiver.collectBuffer(c, sig);
                    this.sendReadConfirmation();
                }
            }
            catch (SocketTimeoutException ste) {
                throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " stopped responding." + this.msg);
            }
        }
    }

    public final void streamBufferWithGroups(Iterator i1, Iterator i2, Collector c) throws IOException {
        block11: {
            try {
                if (!i1.hasNext() && !i2.hasNext()) break block11;
                block10: while (true) {
                    this.in.read(this.buffer, 0, 4);
                    int sig = Streamer.getInt(this.buffer, 0);
                    switch (sig) {
                        case -3: {
                            if (!i1.hasNext() && !this.sender.hasRemaining(0)) continue block10;
                            int size = this.sender.sendBuffer(i1, 0);
                            this.sendWriteNotification(size, this.sender.hasRemaining(0) || i1.hasNext());
                            continue block10;
                        }
                        case -4: {
                            if (!i2.hasNext() && !this.sender.hasRemaining(1)) continue block10;
                            int size = this.sender.sendBuffer(i2, 1);
                            this.sendWriteNotification(size, this.sender.hasRemaining(1) || i2.hasNext());
                            continue block10;
                        }
                        case -1: {
                            return;
                        }
                        case -2: {
                            try {
                                Thread.sleep(2000L);
                            }
                            catch (InterruptedException interruptedException) {
                                // empty catch block
                            }
                            throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + this.msg);
                        }
                    }
                    this.receiver.collectBuffer(c, sig);
                    this.sendReadConfirmation();
                }
            }
            catch (SocketTimeoutException ste) {
                throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " stopped responding." + this.msg);
            }
        }
    }

    protected static final int getInt(byte[] array, int offset) {
        return array[offset] << 24 | (array[offset + 1] & 0xFF) << 16 | (array[offset + 2] & 0xFF) << 8 | array[offset + 3] & 0xFF;
    }

    protected static final void putInt(byte[] array, int offset, int value) {
        array[offset] = (byte)(value >> 24);
        array[offset + 1] = (byte)(value >> 16);
        array[offset + 2] = (byte)(value >> 8);
        array[offset + 3] = (byte)value;
    }
}

