package org.apache.gobblin.async;

import com.google.common.util.concurrent.AbstractExecutionThreadService;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:org/apache/gobblin/async/AsyncDataDispatcher.class */
public abstract class AsyncDataDispatcher<D> extends AbstractExecutionThreadService {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncDataDispatcher.class);
    private final BlockingQueue<D> buffer;
    private final Lock lock = new ReentrantLock(true);
    private final Condition isBufferEmpty = this.lock.newCondition();

    public AsyncDataDispatcher(int i) {
        this.buffer = new ArrayBlockingQueue(i);
        startAsync();
        awaitRunning();
    }

    protected abstract void dispatch(Queue<D> queue) throws DispatchException;

    /* JADX INFO: Access modifiers changed from: protected */
    public void put(D d) {
        checkRunning("put");
        try {
            this.buffer.put(d);
            if (isRunning()) {
                return;
            }
            this.buffer.clear();
            RuntimeException runtimeException = new RuntimeException("Attempt to operate when writer is " + state().name());
            LOG.error("put", runtimeException);
            throw runtimeException;
        } catch (InterruptedException e) {
            throw new RuntimeException("Waiting to put a record interrupted", e);
        }
    }

    /* JADX WARN: Can't wrap try/catch for region: R(6:2|(2:4|(2:13|14)(4:6|7|9|10))|16|17|19|10) */
    /* JADX WARN: Code restructure failed: missing block: B:20:0x0045, code lost:
    
        r5 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:21:0x0046, code lost:
    
        org.apache.gobblin.async.AsyncDataDispatcher.LOG.error("Dispatch incurs an exception", r5);
     */
    /* JADX WARN: Code restructure failed: missing block: B:22:0x0055, code lost:
    
        if (r5.isFatal() != false) goto L23;
     */
    /* JADX WARN: Code restructure failed: missing block: B:24:0x0058, code lost:
    
        stopAsync();
        r4.buffer.clear();
        notifyBufferEmptyOccurrence();
     */
    /* JADX WARN: Code restructure failed: missing block: B:25:0x006b, code lost:
    
        throw r5;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    protected void run() throws java.lang.Exception {
        /*
            r4 = this;
            org.slf4j.Logger r0 = org.apache.gobblin.async.AsyncDataDispatcher.LOG
            java.lang.String r1 = "Start processing records"
            r0.info(r1)
        La:
            r0 = r4
            java.util.concurrent.BlockingQueue<D> r0 = r0.buffer
            boolean r0 = r0.isEmpty()
            if (r0 == 0) goto L3a
            r0 = r4
            r0.notifyBufferEmptyOccurrence()
            r0 = r4
            boolean r0 = r0.isRunning()
            if (r0 != 0) goto L22
            return
        L22:
            r0 = 300(0x12c, double:1.48E-321)
            java.lang.Thread.sleep(r0)     // Catch: java.lang.InterruptedException -> L2b
            goto La
        L2b:
            r5 = move-exception
            org.slf4j.Logger r0 = org.apache.gobblin.async.AsyncDataDispatcher.LOG
            java.lang.String r1 = "Dispatcher sleep interrupted"
            r2 = r5
            r0.warn(r1, r2)
            goto L3a
        L3a:
            r0 = r4
            r1 = r4
            java.util.concurrent.BlockingQueue<D> r1 = r1.buffer     // Catch: org.apache.gobblin.async.DispatchException -> L45
            r0.dispatch(r1)     // Catch: org.apache.gobblin.async.DispatchException -> L45
            goto La
        L45:
            r5 = move-exception
            org.slf4j.Logger r0 = org.apache.gobblin.async.AsyncDataDispatcher.LOG
            java.lang.String r1 = "Dispatch incurs an exception"
            r2 = r5
            r0.error(r1, r2)
            r0 = r5
            boolean r0 = r0.isFatal()
            if (r0 == 0) goto L6c
            r0 = r4
            com.google.common.util.concurrent.Service r0 = r0.stopAsync()
            r0 = r4
            java.util.concurrent.BlockingQueue<D> r0 = r0.buffer
            r0.clear()
            r0 = r4
            r0.notifyBufferEmptyOccurrence()
            r0 = r5
            throw r0
        L6c:
            goto La
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.gobblin.async.AsyncDataDispatcher.run():void");
    }

    public void terminate() {
        stopAsync().awaitTerminated();
    }

    protected void checkRunning(String str) {
        if (isRunning()) {
            return;
        }
        RuntimeException runtimeException = new RuntimeException("Attempt to operate when writer is " + state().name());
        LOG.error(str, runtimeException);
        throw runtimeException;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitForBufferEmpty() {
        checkRunning("waitForBufferEmpty");
        try {
            this.lock.lock();
            while (!this.buffer.isEmpty()) {
                try {
                    this.isBufferEmpty.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException("Waiting for buffer flush interrupted", e);
                }
            }
        } finally {
            this.lock.unlock();
            checkRunning("waitForBufferEmpty");
        }
    }

    private void notifyBufferEmptyOccurrence() {
        try {
            this.lock.lock();
            this.isBufferEmpty.signalAll();
        } finally {
            this.lock.unlock();
        }
    }
}
