package com.github.brandtg.switchboard;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/brandtg/switchboard/MysqlLogPuller.class */
public abstract class MysqlLogPuller {
    private static final String ENCODING = "UTF-8";
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) MysqlLogPuller.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    protected final String database;
    protected final long lastIndex;
    protected final InetSocketAddress sourceAddress;
    protected final InetSocketAddress sinkAddress;
    protected final PipedInputStream inputStream;
    protected final PipedOutputStream outputStream;
    protected final AtomicBoolean isStarted;
    protected EventLoopGroup eventExecutors;
    protected ExecutorService callbackExecutor;
    protected ExecutorService refreshExecutor;
    protected LogReceiver logReceiver;
    protected LogPuller logPuller;

    public MysqlLogPuller(String str, InetSocketAddress inetSocketAddress, InetSocketAddress inetSocketAddress2) {
        this(str, inetSocketAddress, inetSocketAddress2, -1L);
    }

    public MysqlLogPuller(String str, InetSocketAddress inetSocketAddress, InetSocketAddress inetSocketAddress2, long j) {
        this.database = str;
        this.lastIndex = j;
        this.sourceAddress = inetSocketAddress;
        this.sinkAddress = inetSocketAddress2;
        this.inputStream = new PipedInputStream();
        this.outputStream = new PipedOutputStream();
        this.isStarted = new AtomicBoolean();
    }

    protected abstract Runnable getCallback();

    public void start() throws Exception {
        if (this.isStarted.getAndSet(true)) {
            return;
        }
        this.outputStream.connect(this.inputStream);
        this.callbackExecutor = Executors.newSingleThreadExecutor();
        this.refreshExecutor = Executors.newSingleThreadExecutor();
        this.eventExecutors = new NioEventLoopGroup();
        this.logReceiver = new LogReceiver(this.sinkAddress, this.eventExecutors, this.outputStream);
        this.logReceiver.start();
        this.callbackExecutor.submit(getCallback());
        this.logPuller = new LogPuller(this.sourceAddress, this.sinkAddress, this.database, this.lastIndex);
        this.logReceiver.registerListener(this.logPuller);
        this.refreshExecutor.submit(this.logPuller);
    }

    public void shutdown() throws Exception {
        if (this.isStarted.getAndSet(false)) {
            this.logPuller.shutdown();
            try {
                this.logReceiver.shutdown();
            } catch (Exception e) {
                LOG.error("Exception while shutting down log receiver", (Throwable) e);
            }
            this.eventExecutors.shutdownGracefully();
            this.callbackExecutor.shutdown();
            this.refreshExecutor.shutdown();
        }
    }
}
