package com.twitter.logging;

import com.twitter.util.Time$;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Arrays;
import scala.Predef$;
import scala.Serializable;
import scala.collection.mutable.StringBuilder;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: ScribeHandler.scala */
/* loaded from: input_file:com/twitter/logging/ScribeHandler$$anonfun$sendBatch$1$1.class */
public final class ScribeHandler$$anonfun$sendBatch$1$1 extends AbstractFunction1 implements Serializable {
    public static final long serialVersionUID = 0;
    private final ScribeHandler $outer;

    public final void apply(Socket socket) {
        byte[] bArr;
        byte[] bArr2;
        OutputStream outputStream = socket.getOutputStream();
        InputStream inputStream = socket.getInputStream();
        int size = this.$outer.queue().size();
        while (size > 0 && this.$outer.com$twitter$logging$ScribeHandler$$socket().isDefined()) {
            int min = Predef$.MODULE$.intWrapper(this.$outer.com$twitter$logging$ScribeHandler$$maxMessagesPerTransaction).min(size);
            int i = 0;
            try {
                outputStream.write(this.$outer.makeBuffer(min).array());
                bArr = this.$outer.com$twitter$logging$ScribeHandler$$archaicServer() ? this.$outer.com$twitter$logging$ScribeHandler$$OLD_SCRIBE_REPLY : this.$outer.com$twitter$logging$ScribeHandler$$SCRIBE_REPLY;
                bArr2 = new byte[bArr.length];
                while (i < bArr2.length) {
                    int read = inputStream.read(bArr2, i, bArr2.length - i);
                    if (read < 0) {
                        throw new IOException("End of stream");
                    }
                    i += read;
                    if (!this.$outer.com$twitter$logging$ScribeHandler$$archaicServer() && i > 0 && bArr2[0] == 0) {
                        this.$outer.com$twitter$logging$ScribeHandler$$archaicServer_$eq(true);
                        this.$outer.com$twitter$logging$ScribeHandler$$lastConnectAttempt_$eq(Time$.MODULE$.epoch());
                        ScribeHandler$.MODULE$.log().warning("Scribe server is archaic; changing to old protocol for future requests.", Predef$.MODULE$.genericWrapArray(new Object[0]));
                        throw new Retry();
                    }
                }
            } catch (Retry unused) {
                this.$outer.com$twitter$logging$ScribeHandler$$closeSocket();
            } catch (Exception e) {
                ScribeHandler$.MODULE$.log().error(e, "Failed to send %s %d log entries to scribe server at %s:%d", Predef$.MODULE$.genericWrapArray(new Object[]{this.$outer.com$twitter$logging$ScribeHandler$$category, BoxesRunTime.boxToInteger(min), this.$outer.com$twitter$logging$ScribeHandler$$hostname, BoxesRunTime.boxToInteger(this.$outer.com$twitter$logging$ScribeHandler$$port)}));
                this.$outer.com$twitter$logging$ScribeHandler$$closeSocket();
            }
            if (!Arrays.equals(bArr2, bArr)) {
                throw new IOException(new StringBuilder().append("Error response from scribe server: ").append(Predef$.MODULE$.byteArrayOps(bArr2).toList().toString()).toString());
            }
            this.$outer.sentRecords().getAndAdd(min);
            size -= min;
        }
        this.$outer.lastTransmission_$eq(Time$.MODULE$.now());
    }

    public final /* bridge */ /* synthetic */ Object apply(Object obj) {
        apply((Socket) obj);
        return BoxedUnit.UNIT;
    }

    public ScribeHandler$$anonfun$sendBatch$1$1(ScribeHandler scribeHandler) {
        if (scribeHandler == null) {
            throw new NullPointerException();
        }
        this.$outer = scribeHandler;
    }
}
