package org.apache.skywalking.apm.toolkit.logging.common.log;

import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCStreamServiceStatus;
import org.apache.skywalking.apm.agent.core.remote.LogReportServiceClient;
import org.apache.skywalking.apm.agent.core.util.CollectionUtil;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.dependencies.io.grpc.ManagedChannel;
import org.apache.skywalking.apm.dependencies.io.grpc.ManagedChannelBuilder;
import org.apache.skywalking.apm.dependencies.io.grpc.StatusRuntimeException;
import org.apache.skywalking.apm.dependencies.io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.network.common.v3.Commands;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.LogReportServiceGrpc;
import org.apache.skywalking.apm.toolkit.logging.common.log.ToolkitConfig;

@OverrideImplementor(LogReportServiceClient.class)
/* loaded from: input_file:org/apache/skywalking/apm/toolkit/logging/common/log/GRPCLogReportServiceClient.class */
public class GRPCLogReportServiceClient extends LogReportServiceClient {
    private static final ILog LOGGER = LogManager.getLogger(GRPCLogReportServiceClient.class);
    private volatile DataCarrier<LogData> carrier;
    private LogReportServiceGrpc.LogReportServiceStub asyncStub;
    private ManagedChannel channel;
    private AtomicBoolean disconnected = new AtomicBoolean(false);

    public void boot() throws Throwable {
        this.carrier = new DataCarrier<>("gRPC-log", "gRPC-log", Config.Buffer.CHANNEL_SIZE, Config.Buffer.BUFFER_SIZE, BufferStrategy.IF_POSSIBLE);
        this.carrier.consume(this, 1);
        this.channel = ManagedChannelBuilder.forAddress(ToolkitConfig.Plugin.Toolkit.Log.GRPC.Reporter.SERVER_HOST, ToolkitConfig.Plugin.Toolkit.Log.GRPC.Reporter.SERVER_PORT).usePlaintext().build();
        this.asyncStub = LogReportServiceGrpc.newStub(this.channel).withMaxOutboundMessageSize(ToolkitConfig.Plugin.Toolkit.Log.GRPC.Reporter.MAX_MESSAGE_SIZE);
    }

    public void shutdown() {
        try {
            this.carrier.shutdownConsumers();
            if (this.channel != null) {
                this.channel.shutdownNow();
            }
        } catch (Throwable th) {
            LOGGER.error(th.getMessage(), th);
        }
    }

    public void produce(LogData logData) {
        if (Objects.nonNull(logData) && !this.carrier.produce(logData) && LOGGER.isDebugEnable()) {
            LOGGER.debug("One log has been abandoned, cause by buffer is full.");
        }
    }

    public void consume(final List<LogData> list) {
        if (CollectionUtil.isEmpty(list)) {
            return;
        }
        StreamObserver streamObserver = null;
        final GRPCStreamServiceStatus gRPCStreamServiceStatus = new GRPCStreamServiceStatus(false);
        try {
            try {
                streamObserver = this.asyncStub.withDeadlineAfter(ToolkitConfig.Plugin.Toolkit.Log.GRPC.Reporter.UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() { // from class: org.apache.skywalking.apm.toolkit.logging.common.log.GRPCLogReportServiceClient.1
                    public void onNext(Commands commands) {
                    }

                    public void onError(Throwable th) {
                        gRPCStreamServiceStatus.finished();
                        if (GRPCLogReportServiceClient.this.disconnected.compareAndSet(false, true)) {
                            GRPCLogReportServiceClient.LOGGER.error("Send log to gRPC server fail with an internal exception.", th);
                        }
                        GRPCLogReportServiceClient.LOGGER.error(th, "Try to send {} log data to collector, with unexpected exception.", new Object[]{Integer.valueOf(list.size())});
                    }

                    public void onCompleted() {
                        GRPCLogReportServiceClient.this.disconnected.compareAndSet(true, false);
                        gRPCStreamServiceStatus.finished();
                    }
                });
                Iterator<LogData> it = list.iterator();
                while (it.hasNext()) {
                    streamObserver.onNext(it.next());
                }
                if (streamObserver != null) {
                    streamObserver.onCompleted();
                }
                gRPCStreamServiceStatus.wait4Finish();
            } catch (Throwable th) {
                if (!(th instanceof StatusRuntimeException)) {
                    LOGGER.error(th, "Report log failure with the gRPC client.", new Object[0]);
                }
                if (streamObserver != null) {
                    streamObserver.onCompleted();
                }
                gRPCStreamServiceStatus.wait4Finish();
            }
        } catch (Throwable th2) {
            if (streamObserver != null) {
                streamObserver.onCompleted();
            }
            gRPCStreamServiceStatus.wait4Finish();
            throw th2;
        }
    }
}
