package co.cask.cdap.filetailer.sink;

import co.cask.cdap.client.StreamWriter;
import co.cask.cdap.filetailer.AbstractWorker;
import co.cask.cdap.filetailer.PipeListener;
import co.cask.cdap.filetailer.event.FileTailerEvent;
import co.cask.cdap.filetailer.metrics.FileTailerMetricsProcessor;
import co.cask.cdap.filetailer.queue.FileTailerQueue;
import co.cask.cdap.filetailer.state.FileTailerStateProcessor;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/filetailer/sink/FileTailerSink.class */
public class FileTailerSink extends AbstractWorker {
    private static final Logger LOG = LoggerFactory.getLogger(FileTailerSink.class);
    private static final int DEFAULT_PACK_SIZE = 1;
    private static final int MAX_RETRY_COUNT = 3;
    private final FileTailerQueue queue;
    private final SinkStrategy strategy;
    private final StreamWriter writer;
    private final int packSize;
    private final Random random;
    private final FileTailerStateProcessor stateProcessor;
    private final FileTailerMetricsProcessor metricsProcessor;
    private PipeListener pipeListener;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:co/cask/cdap/filetailer/sink/FileTailerSink$WriteCallback.class */
    public class WriteCallback implements FutureCallback<Void> {
        private final FileTailerEvent event;
        private final UploadLatch latch;
        private final int maxRetryCount;
        private final int retryCount;
        private final long sendStartTime;

        WriteCallback(FileTailerSink fileTailerSink, FileTailerEvent fileTailerEvent, UploadLatch uploadLatch, int i, long j) {
            this(fileTailerEvent, uploadLatch, i, FileTailerSink.DEFAULT_PACK_SIZE, j);
        }

        WriteCallback(FileTailerEvent fileTailerEvent, UploadLatch uploadLatch, int i, int i2, long j) {
            this.event = fileTailerEvent;
            this.latch = uploadLatch;
            this.maxRetryCount = i;
            this.retryCount = i2;
            this.sendStartTime = j;
        }

        public void onSuccess(Void r7) {
            FileTailerSink.LOG.debug("Event {} successfully uploaded", this.event);
            FileTailerSink.this.metricsProcessor.onIngestEventMetric((int) (System.currentTimeMillis() - this.sendStartTime));
            this.latch.reportSuccess();
        }

        public void onFailure(Throwable th) {
            if (this.maxRetryCount == this.retryCount) {
                FileTailerSink.LOG.debug("Failed to upload event {}", this.event);
                this.latch.reportFailure(this.event);
                return;
            }
            try {
                FileTailerSink.this.uploadEvent(this.latch, this.event, this.retryCount + FileTailerSink.DEFAULT_PACK_SIZE);
            } catch (IOException e) {
                FileTailerSink.LOG.debug("Failed to upload event", e);
                this.latch.reportFailure(this.event);
            }
        }
    }

    public FileTailerSink(FileTailerQueue fileTailerQueue, StreamWriter streamWriter, SinkStrategy sinkStrategy, FileTailerStateProcessor fileTailerStateProcessor, FileTailerMetricsProcessor fileTailerMetricsProcessor, PipeListener pipeListener) {
        this(fileTailerQueue, streamWriter, sinkStrategy, fileTailerStateProcessor, fileTailerMetricsProcessor, pipeListener, DEFAULT_PACK_SIZE);
    }

    public FileTailerSink(FileTailerQueue fileTailerQueue, StreamWriter streamWriter, SinkStrategy sinkStrategy, FileTailerStateProcessor fileTailerStateProcessor, FileTailerMetricsProcessor fileTailerMetricsProcessor, PipeListener pipeListener, int i) {
        this.stateProcessor = fileTailerStateProcessor;
        this.metricsProcessor = fileTailerMetricsProcessor;
        this.queue = fileTailerQueue;
        this.writer = streamWriter;
        this.strategy = sinkStrategy;
        this.packSize = i;
        this.random = new Random();
        this.pipeListener = pipeListener;
    }

    /* JADX WARN: Code restructure failed: missing block: B:13:0x004a, code lost:
    
        if (r0.isEmpty() != false) goto L14;
     */
    /* JADX WARN: Code restructure failed: missing block: B:14:0x004d, code lost:
    
        uploadEventPack(r0);
        co.cask.cdap.filetailer.sink.FileTailerSink.LOG.debug("Saving File Tailer state");
        r4.stateProcessor.saveState(r0.getState());
        co.cask.cdap.filetailer.sink.FileTailerSink.LOG.debug("Cleanup event pack");
        r0.clear();
     */
    /* JADX WARN: Code restructure failed: missing block: B:15:0x0077, code lost:
    
        r4.pipeListener.onIngest();
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void run() {
        /*
            Method dump skipped, instructions count: 257
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: co.cask.cdap.filetailer.sink.FileTailerSink.run():void");
    }

    private void uploadEventPack(EventPack eventPack) throws InterruptedException, IOException {
        List<FileTailerEvent> events = eventPack.getEvents();
        UploadLatch uploadLatch = new UploadLatch(events.size());
        Iterator<FileTailerEvent> it = events.iterator();
        while (it.hasNext()) {
            uploadEvent(uploadLatch, it.next());
        }
        uploadLatch.await();
        if (uploadLatch.isSuccessful()) {
            return;
        }
        LOG.debug("Failed to upload {} events ", Integer.valueOf(uploadLatch.getFailedEvents().size()));
        throw new IOException("Failed to upload events!");
    }

    private void uploadEvent(UploadLatch uploadLatch, FileTailerEvent fileTailerEvent) throws IOException {
        uploadEvent(uploadLatch, fileTailerEvent, 0);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void uploadEvent(UploadLatch uploadLatch, FileTailerEvent fileTailerEvent, int i) throws IOException {
        LOG.debug("Uploading event {} with writer {}. Attempt {} out of {} ", new Object[]{fileTailerEvent, this.writer, Integer.valueOf(i), Integer.valueOf(MAX_RETRY_COUNT)});
        Futures.addCallback(this.writer.write(fileTailerEvent.getEventData(), fileTailerEvent.getCharset()), new WriteCallback(fileTailerEvent, uploadLatch, MAX_RETRY_COUNT, i, System.currentTimeMillis()));
    }
}
