/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.languagebinding.api.java.common.streaming;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.languagebinding.api.java.common.PlanBinder;

public class Sender
implements Serializable {
    public static final byte TYPE_TUPLE = 11;
    public static final byte TYPE_BOOLEAN = 10;
    public static final byte TYPE_BYTE = 9;
    public static final byte TYPE_SHORT = 8;
    public static final byte TYPE_INTEGER = 7;
    public static final byte TYPE_LONG = 6;
    public static final byte TYPE_DOUBLE = 4;
    public static final byte TYPE_FLOAT = 5;
    public static final byte TYPE_CHAR = 3;
    public static final byte TYPE_STRING = 2;
    public static final byte TYPE_BYTES = 1;
    public static final byte TYPE_NULL = 0;
    private final AbstractRichFunction function;
    private File outputFile;
    private RandomAccessFile outputRAF;
    private FileChannel outputChannel;
    private MappedByteBuffer fileBuffer;
    private final ByteBuffer[] saved = new ByteBuffer[2];
    private final Serializer[] serializer = new Serializer[2];

    public Sender(AbstractRichFunction function) {
        this.function = function;
    }

    public void open(String path) throws IOException {
        this.setupMappedFile(path);
    }

    private void setupMappedFile(String outputFilePath) throws FileNotFoundException, IOException {
        File x = new File(PlanBinder.FLINK_TMP_DATA_DIR);
        x.mkdirs();
        this.outputFile = new File(outputFilePath);
        if (this.outputFile.exists()) {
            this.outputFile.delete();
        }
        this.outputFile.createNewFile();
        this.outputRAF = new RandomAccessFile(outputFilePath, "rw");
        this.outputRAF.setLength(0x4000000L);
        this.outputRAF.seek(0x3FFFFFFL);
        this.outputRAF.writeByte(0);
        this.outputRAF.seek(0L);
        this.outputChannel = this.outputRAF.getChannel();
        this.fileBuffer = this.outputChannel.map(FileChannel.MapMode.READ_WRITE, 0L, 0x4000000L);
    }

    public void close() throws IOException {
        this.closeMappedFile();
    }

    private void closeMappedFile() throws IOException {
        this.outputChannel.close();
        this.outputRAF.close();
    }

    public void reset() {
        this.serializer[0] = null;
        this.serializer[1] = null;
        this.fileBuffer.clear();
    }

    public int sendRecord(Object value) throws IOException {
        this.fileBuffer.clear();
        int group = 0;
        this.serializer[group] = this.getSerializer(value);
        ByteBuffer bb = this.serializer[group].serialize(value);
        if (bb.remaining() > 0x4000000) {
            throw new RuntimeException("Serialized object does not fit into a single buffer.");
        }
        this.fileBuffer.put(bb);
        int size = this.fileBuffer.position();
        this.reset();
        return size;
    }

    public boolean hasRemaining(int group) {
        return this.saved[group] != null;
    }

    public int sendBuffer(Iterator i, int group) throws IOException {
        ByteBuffer bb;
        Object value;
        this.fileBuffer.clear();
        if (this.serializer[group] == null) {
            value = i.next();
            this.serializer[group] = this.getSerializer(value);
            bb = this.serializer[group].serialize(value);
            if (bb.remaining() > 0x4000000) {
                throw new RuntimeException("Serialized object does not fit into a single buffer.");
            }
            this.fileBuffer.put(bb);
        }
        if (this.saved[group] != null) {
            this.fileBuffer.put(this.saved[group]);
            this.saved[group] = null;
        }
        while (i.hasNext() && this.saved[group] == null) {
            value = i.next();
            bb = this.serializer[group].serialize(value);
            if (bb.remaining() > 0x4000000) {
                throw new RuntimeException("Serialized object does not fit into a single buffer.");
            }
            if (bb.remaining() <= this.fileBuffer.remaining()) {
                this.fileBuffer.put(bb);
                continue;
            }
            this.saved[group] = bb;
        }
        int size = this.fileBuffer.position();
        return size;
    }

    private Serializer getSerializer(Object value) throws IOException {
        String className = value.getClass().getSimpleName().toUpperCase();
        if (className.startsWith("TUPLE")) {
            className = "TUPLE";
        }
        if (className.startsWith("BYTE[]")) {
            className = "BYTES";
        }
        SupportedTypes type = SupportedTypes.valueOf(className);
        switch (type) {
            case TUPLE: {
                this.fileBuffer.put((byte)11);
                this.fileBuffer.putInt(((Tuple)value).getArity());
                return new TupleSerializer((Tuple)value);
            }
            case BOOLEAN: {
                this.fileBuffer.put((byte)10);
                return new BooleanSerializer();
            }
            case BYTE: {
                this.fileBuffer.put((byte)9);
                return new ByteSerializer();
            }
            case BYTES: {
                this.fileBuffer.put((byte)1);
                return new BytesSerializer();
            }
            case CHARACTER: {
                this.fileBuffer.put((byte)3);
                return new CharSerializer();
            }
            case SHORT: {
                this.fileBuffer.put((byte)8);
                return new ShortSerializer();
            }
            case INTEGER: {
                this.fileBuffer.put((byte)7);
                return new IntSerializer();
            }
            case LONG: {
                this.fileBuffer.put((byte)6);
                return new LongSerializer();
            }
            case STRING: {
                this.fileBuffer.put((byte)2);
                return new StringSerializer();
            }
            case FLOAT: {
                this.fileBuffer.put((byte)5);
                return new FloatSerializer();
            }
            case DOUBLE: {
                this.fileBuffer.put((byte)4);
                return new DoubleSerializer();
            }
            case NULL: {
                this.fileBuffer.put((byte)0);
                return new NullSerializer();
            }
        }
        throw new IllegalArgumentException("Unknown Type encountered: " + (Object)((Object)type));
    }

    private class TupleSerializer
    extends Serializer<Tuple> {
        private final Serializer[] serializer;
        private final List<ByteBuffer> buffers;

        public TupleSerializer(Tuple value) throws IOException {
            super(0);
            this.serializer = new Serializer[value.getArity()];
            this.buffers = new ArrayList<ByteBuffer>();
            for (int x = 0; x < this.serializer.length; ++x) {
                this.serializer[x] = Sender.this.getSerializer(value.getField(x));
            }
        }

        @Override
        public void serializeInternal(Tuple value) {
            int length = 0;
            for (int x = 0; x < this.serializer.length; ++x) {
                this.serializer[x].buffer.clear();
                this.serializer[x].serializeInternal(value.getField(x));
                length += this.serializer[x].buffer.position();
                this.buffers.add(this.serializer[x].buffer);
            }
            this.buffer = ByteBuffer.allocate(length);
            for (ByteBuffer b : this.buffers) {
                b.flip();
                this.buffer.put(b);
            }
            this.buffers.clear();
        }
    }

    private class BytesSerializer
    extends Serializer<byte[]> {
        public BytesSerializer() {
            super(0);
        }

        @Override
        public void serializeInternal(byte[] value) {
            this.buffer = ByteBuffer.allocate(4 + value.length);
            this.buffer.putInt(value.length);
            this.buffer.put(value);
        }
    }

    private class NullSerializer
    extends Serializer<Object> {
        public NullSerializer() {
            super(0);
        }

        @Override
        public void serializeInternal(Object value) {
        }
    }

    private class DoubleSerializer
    extends Serializer<Double> {
        public DoubleSerializer() {
            super(8);
        }

        @Override
        public void serializeInternal(Double value) {
            this.buffer.putDouble(value);
        }
    }

    private class FloatSerializer
    extends Serializer<Float> {
        public FloatSerializer() {
            super(4);
        }

        @Override
        public void serializeInternal(Float value) {
            this.buffer.putFloat(value.floatValue());
        }
    }

    private class StringSerializer
    extends Serializer<String> {
        public StringSerializer() {
            super(0);
        }

        @Override
        public void serializeInternal(String value) {
            byte[] bytes = value.getBytes();
            this.buffer = ByteBuffer.allocate(bytes.length + 4);
            this.buffer.putInt(bytes.length);
            this.buffer.put(bytes);
        }
    }

    private class LongSerializer
    extends Serializer<Long> {
        public LongSerializer() {
            super(8);
        }

        @Override
        public void serializeInternal(Long value) {
            this.buffer.putLong(value);
        }
    }

    private class IntSerializer
    extends Serializer<Integer> {
        public IntSerializer() {
            super(4);
        }

        @Override
        public void serializeInternal(Integer value) {
            this.buffer.putInt(value);
        }
    }

    private class ShortSerializer
    extends Serializer<Short> {
        public ShortSerializer() {
            super(2);
        }

        @Override
        public void serializeInternal(Short value) {
            this.buffer.putShort(value);
        }
    }

    private class CharSerializer
    extends Serializer<Character> {
        public CharSerializer() {
            super(4);
        }

        @Override
        public void serializeInternal(Character value) {
            this.buffer.put((value + "").getBytes());
        }
    }

    private class BooleanSerializer
    extends Serializer<Boolean> {
        public BooleanSerializer() {
            super(1);
        }

        @Override
        public void serializeInternal(Boolean value) {
            this.buffer.put(value != false ? (byte)1 : 0);
        }
    }

    private class ByteSerializer
    extends Serializer<Byte> {
        public ByteSerializer() {
            super(1);
        }

        @Override
        public void serializeInternal(Byte value) {
            this.buffer.put(value);
        }
    }

    private abstract class Serializer<T> {
        protected ByteBuffer buffer;

        public Serializer(int capacity) {
            this.buffer = ByteBuffer.allocate(capacity);
        }

        public ByteBuffer serialize(T value) {
            this.buffer.clear();
            this.serializeInternal(value);
            this.buffer.flip();
            return this.buffer;
        }

        public abstract void serializeInternal(T var1);
    }

    private static enum SupportedTypes {
        TUPLE,
        BOOLEAN,
        BYTE,
        BYTES,
        CHARACTER,
        SHORT,
        INTEGER,
        LONG,
        FLOAT,
        DOUBLE,
        STRING,
        OTHER,
        NULL;

    }
}

