package org.apache.flink.languagebinding.api.java.common.streaming;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.languagebinding.api.java.common.PlanBinder;

/* loaded from: input_file:org/apache/flink/languagebinding/api/java/common/streaming/Sender.class */
public class Sender implements Serializable {
    public static final byte TYPE_TUPLE = 11;
    public static final byte TYPE_BOOLEAN = 10;
    public static final byte TYPE_BYTE = 9;
    public static final byte TYPE_SHORT = 8;
    public static final byte TYPE_INTEGER = 7;
    public static final byte TYPE_LONG = 6;
    public static final byte TYPE_DOUBLE = 4;
    public static final byte TYPE_FLOAT = 5;
    public static final byte TYPE_CHAR = 3;
    public static final byte TYPE_STRING = 2;
    public static final byte TYPE_BYTES = 1;
    public static final byte TYPE_NULL = 0;
    private final AbstractRichFunction function;
    private File outputFile;
    private RandomAccessFile outputRAF;
    private FileChannel outputChannel;
    private MappedByteBuffer fileBuffer;
    private final ByteBuffer[] saved = new ByteBuffer[2];
    private final Serializer[] serializer = new Serializer[2];

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.languagebinding.api.java.common.streaming.Sender$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/languagebinding/api/java/common/streaming/Sender$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$languagebinding$api$java$common$streaming$Sender$SupportedTypes = new int[SupportedTypes.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$languagebinding$api$java$common$streaming$Sender$SupportedTypes[SupportedTypes.TUPLE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$languagebinding$api$java$common$streaming$Sender$SupportedTypes[SupportedTypes.BOOLEAN.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$languagebinding$api$java$common$streaming$Sender$SupportedTypes[SupportedTypes.BYTE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$languagebinding$api$java$common$streaming$Sender$SupportedTypes[SupportedTypes.BYTES.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$flink$languagebinding$api$java$common$streaming$Sender$SupportedTypes[SupportedTypes.CHARACTER.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$flink$languagebinding$api$java$common$streaming$Sender$SupportedTypes[SupportedTypes.SHORT.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$flink$languagebinding$api$java$common$streaming$Sender$SupportedTypes[SupportedTypes.INTEGER.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$flink$languagebinding$api$java$common$streaming$Sender$SupportedTypes[SupportedTypes.LONG.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$flink$languagebinding$api$java$common$streaming$Sender$SupportedTypes[SupportedTypes.STRING.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$flink$languagebinding$api$java$common$streaming$Sender$SupportedTypes[SupportedTypes.FLOAT.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$flink$languagebinding$api$java$common$streaming$Sender$SupportedTypes[SupportedTypes.DOUBLE.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$org$apache$flink$languagebinding$api$java$common$streaming$Sender$SupportedTypes[SupportedTypes.NULL.ordinal()] = 12;
            } catch (NoSuchFieldError e12) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/languagebinding/api/java/common/streaming/Sender$BooleanSerializer.class */
    public class BooleanSerializer extends Serializer<Boolean> {
        public BooleanSerializer() {
            super(1);
        }

        @Override // org.apache.flink.languagebinding.api.java.common.streaming.Sender.Serializer
        public void serializeInternal(Boolean bool) {
            this.buffer.put(bool.booleanValue() ? (byte) 1 : (byte) 0);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/languagebinding/api/java/common/streaming/Sender$ByteSerializer.class */
    public class ByteSerializer extends Serializer<Byte> {
        public ByteSerializer() {
            super(1);
        }

        @Override // org.apache.flink.languagebinding.api.java.common.streaming.Sender.Serializer
        public void serializeInternal(Byte b) {
            this.buffer.put(b.byteValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/languagebinding/api/java/common/streaming/Sender$BytesSerializer.class */
    public class BytesSerializer extends Serializer<byte[]> {
        public BytesSerializer() {
            super(0);
        }

        @Override // org.apache.flink.languagebinding.api.java.common.streaming.Sender.Serializer
        public void serializeInternal(byte[] bArr) {
            this.buffer = ByteBuffer.allocate(4 + bArr.length);
            this.buffer.putInt(bArr.length);
            this.buffer.put(bArr);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/languagebinding/api/java/common/streaming/Sender$CharSerializer.class */
    public class CharSerializer extends Serializer<Character> {
        public CharSerializer() {
            super(4);
        }

        @Override // org.apache.flink.languagebinding.api.java.common.streaming.Sender.Serializer
        public void serializeInternal(Character ch) {
            this.buffer.put((ch + "").getBytes());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/languagebinding/api/java/common/streaming/Sender$DoubleSerializer.class */
    public class DoubleSerializer extends Serializer<Double> {
        public DoubleSerializer() {
            super(8);
        }

        @Override // org.apache.flink.languagebinding.api.java.common.streaming.Sender.Serializer
        public void serializeInternal(Double d) {
            this.buffer.putDouble(d.doubleValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/languagebinding/api/java/common/streaming/Sender$FloatSerializer.class */
    public class FloatSerializer extends Serializer<Float> {
        public FloatSerializer() {
            super(4);
        }

        @Override // org.apache.flink.languagebinding.api.java.common.streaming.Sender.Serializer
        public void serializeInternal(Float f) {
            this.buffer.putFloat(f.floatValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/languagebinding/api/java/common/streaming/Sender$IntSerializer.class */
    public class IntSerializer extends Serializer<Integer> {
        public IntSerializer() {
            super(4);
        }

        @Override // org.apache.flink.languagebinding.api.java.common.streaming.Sender.Serializer
        public void serializeInternal(Integer num) {
            this.buffer.putInt(num.intValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/languagebinding/api/java/common/streaming/Sender$LongSerializer.class */
    public class LongSerializer extends Serializer<Long> {
        public LongSerializer() {
            super(8);
        }

        @Override // org.apache.flink.languagebinding.api.java.common.streaming.Sender.Serializer
        public void serializeInternal(Long l) {
            this.buffer.putLong(l.longValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/languagebinding/api/java/common/streaming/Sender$NullSerializer.class */
    public class NullSerializer extends Serializer<Object> {
        public NullSerializer() {
            super(0);
        }

        @Override // org.apache.flink.languagebinding.api.java.common.streaming.Sender.Serializer
        public void serializeInternal(Object obj) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/languagebinding/api/java/common/streaming/Sender$Serializer.class */
    public abstract class Serializer<T> {
        protected ByteBuffer buffer;

        public Serializer(int i) {
            this.buffer = ByteBuffer.allocate(i);
        }

        public ByteBuffer serialize(T t) {
            this.buffer.clear();
            serializeInternal(t);
            this.buffer.flip();
            return this.buffer;
        }

        public abstract void serializeInternal(T t);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/languagebinding/api/java/common/streaming/Sender$ShortSerializer.class */
    public class ShortSerializer extends Serializer<Short> {
        public ShortSerializer() {
            super(2);
        }

        @Override // org.apache.flink.languagebinding.api.java.common.streaming.Sender.Serializer
        public void serializeInternal(Short sh) {
            this.buffer.putShort(sh.shortValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/languagebinding/api/java/common/streaming/Sender$StringSerializer.class */
    public class StringSerializer extends Serializer<String> {
        public StringSerializer() {
            super(0);
        }

        @Override // org.apache.flink.languagebinding.api.java.common.streaming.Sender.Serializer
        public void serializeInternal(String str) {
            byte[] bytes = str.getBytes();
            this.buffer = ByteBuffer.allocate(bytes.length + 4);
            this.buffer.putInt(bytes.length);
            this.buffer.put(bytes);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/languagebinding/api/java/common/streaming/Sender$SupportedTypes.class */
    public enum SupportedTypes {
        TUPLE,
        BOOLEAN,
        BYTE,
        BYTES,
        CHARACTER,
        SHORT,
        INTEGER,
        LONG,
        FLOAT,
        DOUBLE,
        STRING,
        OTHER,
        NULL
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/languagebinding/api/java/common/streaming/Sender$TupleSerializer.class */
    public class TupleSerializer extends Serializer<Tuple> {
        private final Serializer[] serializer;
        private final List<ByteBuffer> buffers;

        public TupleSerializer(Tuple tuple) throws IOException {
            super(0);
            this.serializer = new Serializer[tuple.getArity()];
            this.buffers = new ArrayList();
            for (int i = 0; i < this.serializer.length; i++) {
                this.serializer[i] = Sender.this.getSerializer(tuple.getField(i));
            }
        }

        @Override // org.apache.flink.languagebinding.api.java.common.streaming.Sender.Serializer
        public void serializeInternal(Tuple tuple) {
            int i = 0;
            for (int i2 = 0; i2 < this.serializer.length; i2++) {
                this.serializer[i2].buffer.clear();
                this.serializer[i2].serializeInternal(tuple.getField(i2));
                i += this.serializer[i2].buffer.position();
                this.buffers.add(this.serializer[i2].buffer);
            }
            this.buffer = ByteBuffer.allocate(i);
            for (ByteBuffer byteBuffer : this.buffers) {
                byteBuffer.flip();
                this.buffer.put(byteBuffer);
            }
            this.buffers.clear();
        }
    }

    public Sender(AbstractRichFunction abstractRichFunction) {
        this.function = abstractRichFunction;
    }

    public void open(String str) throws IOException {
        setupMappedFile(str);
    }

    private void setupMappedFile(String str) throws FileNotFoundException, IOException {
        new File(PlanBinder.FLINK_TMP_DATA_DIR).mkdirs();
        this.outputFile = new File(str);
        if (this.outputFile.exists()) {
            this.outputFile.delete();
        }
        this.outputFile.createNewFile();
        this.outputRAF = new RandomAccessFile(str, "rw");
        this.outputRAF.setLength(67108864L);
        this.outputRAF.seek(67108863L);
        this.outputRAF.writeByte(0);
        this.outputRAF.seek(0L);
        this.outputChannel = this.outputRAF.getChannel();
        this.fileBuffer = this.outputChannel.map(FileChannel.MapMode.READ_WRITE, 0L, 67108864L);
    }

    public void close() throws IOException {
        closeMappedFile();
    }

    private void closeMappedFile() throws IOException {
        this.outputChannel.close();
        this.outputRAF.close();
    }

    public void reset() {
        this.serializer[0] = null;
        this.serializer[1] = null;
        this.fileBuffer.clear();
    }

    public int sendRecord(Object obj) throws IOException {
        this.fileBuffer.clear();
        this.serializer[0] = getSerializer(obj);
        ByteBuffer serialize = this.serializer[0].serialize(obj);
        if (serialize.remaining() > 67108864) {
            throw new RuntimeException("Serialized object does not fit into a single buffer.");
        }
        this.fileBuffer.put(serialize);
        int position = this.fileBuffer.position();
        reset();
        return position;
    }

    public boolean hasRemaining(int i) {
        return this.saved[i] != null;
    }

    public int sendBuffer(Iterator it, int i) throws IOException {
        this.fileBuffer.clear();
        if (this.serializer[i] == null) {
            Object next = it.next();
            this.serializer[i] = getSerializer(next);
            ByteBuffer serialize = this.serializer[i].serialize(next);
            if (serialize.remaining() > 67108864) {
                throw new RuntimeException("Serialized object does not fit into a single buffer.");
            }
            this.fileBuffer.put(serialize);
        }
        if (this.saved[i] != null) {
            this.fileBuffer.put(this.saved[i]);
            this.saved[i] = null;
        }
        while (it.hasNext() && this.saved[i] == null) {
            ByteBuffer serialize2 = this.serializer[i].serialize(it.next());
            if (serialize2.remaining() > 67108864) {
                throw new RuntimeException("Serialized object does not fit into a single buffer.");
            }
            if (serialize2.remaining() <= this.fileBuffer.remaining()) {
                this.fileBuffer.put(serialize2);
            } else {
                this.saved[i] = serialize2;
            }
        }
        return this.fileBuffer.position();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Serializer getSerializer(Object obj) throws IOException {
        String upperCase = obj.getClass().getSimpleName().toUpperCase();
        if (upperCase.startsWith("TUPLE")) {
            upperCase = "TUPLE";
        }
        if (upperCase.startsWith("BYTE[]")) {
            upperCase = "BYTES";
        }
        SupportedTypes valueOf = SupportedTypes.valueOf(upperCase);
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$languagebinding$api$java$common$streaming$Sender$SupportedTypes[valueOf.ordinal()]) {
            case TYPE_BYTES /* 1 */:
                this.fileBuffer.put((byte) 11);
                this.fileBuffer.putInt(((Tuple) obj).getArity());
                return new TupleSerializer((Tuple) obj);
            case TYPE_STRING /* 2 */:
                this.fileBuffer.put((byte) 10);
                return new BooleanSerializer();
            case TYPE_CHAR /* 3 */:
                this.fileBuffer.put((byte) 9);
                return new ByteSerializer();
            case TYPE_DOUBLE /* 4 */:
                this.fileBuffer.put((byte) 1);
                return new BytesSerializer();
            case TYPE_FLOAT /* 5 */:
                this.fileBuffer.put((byte) 3);
                return new CharSerializer();
            case TYPE_LONG /* 6 */:
                this.fileBuffer.put((byte) 8);
                return new ShortSerializer();
            case TYPE_INTEGER /* 7 */:
                this.fileBuffer.put((byte) 7);
                return new IntSerializer();
            case TYPE_SHORT /* 8 */:
                this.fileBuffer.put((byte) 6);
                return new LongSerializer();
            case TYPE_BYTE /* 9 */:
                this.fileBuffer.put((byte) 2);
                return new StringSerializer();
            case TYPE_BOOLEAN /* 10 */:
                this.fileBuffer.put((byte) 5);
                return new FloatSerializer();
            case TYPE_TUPLE /* 11 */:
                this.fileBuffer.put((byte) 4);
                return new DoubleSerializer();
            case 12:
                this.fileBuffer.put((byte) 0);
                return new NullSerializer();
            default:
                throw new IllegalArgumentException("Unknown Type encountered: " + valueOf);
        }
    }
}
