package org.apache.apex.malhar.flume.sink;

import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.netlet.DefaultEventLoop;
import com.datatorrent.netlet.NetletThrowable;
import com.datatorrent.netlet.util.Slice;
import java.io.IOError;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.ServiceConfigurationError;
import org.apache.apex.malhar.flume.discovery.Discovery;
import org.apache.apex.malhar.flume.interceptor.ColumnFilteringInterceptor;
import org.apache.apex.malhar.flume.sink.Server;
import org.apache.apex.malhar.flume.storage.EventCodec;
import org.apache.apex.malhar.flume.storage.Storage;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/apex/malhar/flume/sink/FlumeSink.class */
public class FlumeSink extends AbstractSink implements Configurable {
    private static final String HOSTNAME_STRING = "hostname";
    private static final String HOSTNAME_DEFAULT = "locahost";
    private static final long ACCEPTED_TOLERANCE = 20000;
    private DefaultEventLoop eventloop;
    private Server server;
    private int outstandingEventsCount;
    private int lastConsumedEventsCount;
    private int idleCount;
    private byte[] playback;
    private Server.Client client;
    private String hostname;
    private int port;
    private String id;
    private long acceptedTolerance;
    private long sleepMillis;
    private double throughputAdjustmentFactor;
    private int minimumEventsPerTransaction;
    private int maximumEventsPerTransaction;
    private long commitEventTimeoutMillis;
    private transient long lastCommitEventTimeMillis;
    private Storage storage;
    Discovery<byte[]> discovery;
    StreamCodec<Event> codec;
    private static final Logger logger = LoggerFactory.getLogger(FlumeSink.class);

    /* renamed from: org.apache.apex.malhar.flume.sink.FlumeSink$3, reason: invalid class name */
    /* loaded from: input_file:org/apache/apex/malhar/flume/sink/FlumeSink$3.class */
    static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$apex$malhar$flume$sink$Server$Command = new int[Server.Command.values().length];

        static {
            try {
                $SwitchMap$org$apache$apex$malhar$flume$sink$Server$Command[Server.Command.SEEK.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$apex$malhar$flume$sink$Server$Command[Server.Command.COMMITTED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$apex$malhar$flume$sink$Server$Command[Server.Command.CONNECTED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$apex$malhar$flume$sink$Server$Command[Server.Command.DISCONNECTED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$apex$malhar$flume$sink$Server$Command[Server.Command.WINDOWED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$apex$malhar$flume$sink$Server$Command[Server.Command.SERVER_ERROR.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    public Sink.Status process() throws EventDeliveryException {
        int i;
        Event take;
        synchronized (this.server.requests) {
            Iterator<Server.Request> it = this.server.requests.iterator();
            while (it.hasNext()) {
                Server.Request next = it.next();
                logger.debug("found {}", next);
                switch (AnonymousClass3.$SwitchMap$org$apache$apex$malhar$flume$sink$Server$Command[next.type.ordinal()]) {
                    case ColumnFilteringInterceptor.Constants.DST_SEPARATOR_DFLT /* 1 */:
                        this.lastCommitEventTimeMillis = System.currentTimeMillis();
                        Slice address = next.getAddress();
                        this.playback = this.storage.retrieve(Arrays.copyOfRange(address.buffer, address.offset, address.offset + address.length));
                        this.client = next.client;
                        break;
                    case ColumnFilteringInterceptor.Constants.SRC_SEPARATOR_DFLT /* 2 */:
                        this.lastCommitEventTimeMillis = System.currentTimeMillis();
                        Slice address2 = next.getAddress();
                        this.storage.clean(Arrays.copyOfRange(address2.buffer, address2.offset, address2.offset + address2.length));
                        break;
                    case 3:
                        logger.debug("Connected received, ignoring it!");
                        break;
                    case 4:
                        if (next.client != this.client) {
                            break;
                        } else {
                            this.client = null;
                            this.outstandingEventsCount = 0;
                            break;
                        }
                    case 5:
                        this.lastConsumedEventsCount = next.getEventCount();
                        this.idleCount = next.getIdleCount();
                        this.outstandingEventsCount -= this.lastConsumedEventsCount;
                        break;
                    case 6:
                        throw new IOError(null);
                    default:
                        logger.debug("Cannot understand the request {}", next);
                        break;
                }
            }
            this.server.requests.clear();
        }
        if (this.client == null) {
            logger.info("No client expressed interest yet to consume the events.");
            return Sink.Status.BACKOFF;
        }
        if (System.currentTimeMillis() - this.lastCommitEventTimeMillis > this.commitEventTimeoutMillis) {
            logger.info("Client has not processed the workload given for the last {} milliseconds, so backing off.", Long.valueOf(System.currentTimeMillis() - this.lastCommitEventTimeMillis));
            return Sink.Status.BACKOFF;
        }
        if (this.outstandingEventsCount < 0) {
            i = this.idleCount > 1 ? (int) ((1.0d + (this.throughputAdjustmentFactor * this.idleCount)) * this.lastConsumedEventsCount) : (int) ((1.0d + this.throughputAdjustmentFactor) * this.lastConsumedEventsCount);
        } else if (this.outstandingEventsCount > this.lastConsumedEventsCount) {
            i = (int) ((1.0d - this.throughputAdjustmentFactor) * this.lastConsumedEventsCount);
        } else if (this.idleCount > 0) {
            i = (int) ((1.0d + (this.throughputAdjustmentFactor * this.idleCount)) * this.lastConsumedEventsCount);
            if (i <= 0) {
                i = this.minimumEventsPerTransaction;
            }
        } else {
            i = this.lastConsumedEventsCount;
        }
        if (i >= this.maximumEventsPerTransaction) {
            i = this.maximumEventsPerTransaction;
        } else if (i <= 0) {
            i = this.minimumEventsPerTransaction;
        }
        if (i > 0) {
            if (this.playback != null) {
                int i2 = 0;
                do {
                    try {
                        if (!this.client.write(this.playback)) {
                            retryWrite(this.playback, null);
                        }
                        this.outstandingEventsCount++;
                        this.playback = this.storage.retrieveNext();
                        i2++;
                        if (i2 < i) {
                        }
                    } catch (Exception e) {
                        logger.warn("Playback Failed", e);
                        if (e instanceof NetletThrowable) {
                            try {
                                this.eventloop.disconnect(this.client);
                                this.client = null;
                                this.outstandingEventsCount = 0;
                            } finally {
                            }
                        }
                        return Sink.Status.BACKOFF;
                    }
                } while (this.playback != null);
            } else {
                int i3 = 0;
                Transaction transaction = getChannel().getTransaction();
                try {
                    try {
                        transaction.begin();
                        while (i3 < i && (take = getChannel().take()) != null) {
                            Slice byteArray = this.codec.toByteArray(take);
                            byte[] store = this.storage.store(byteArray);
                            if (store != null) {
                                if (!this.client.write(store, byteArray)) {
                                    retryWrite(store, byteArray);
                                }
                                this.outstandingEventsCount++;
                            } else {
                                logger.debug("Detected the condition of recovery from flume crash!");
                            }
                            i3++;
                        }
                        if (i3 > 0) {
                            this.storage.flush();
                        }
                        transaction.commit();
                        if (i3 > 0) {
                            logger.debug("Transaction details maxTuples = {}, storedTuples = {}, outstanding = {}", new Object[]{Integer.valueOf(i), Integer.valueOf(i3), Integer.valueOf(this.outstandingEventsCount)});
                        }
                        transaction.close();
                        if (i3 == 0) {
                            sleep();
                        }
                    } catch (Error e2) {
                        transaction.rollback();
                        throw e2;
                    } catch (Exception e3) {
                        logger.error("Transaction Failed", e3);
                        if ((e3 instanceof NetletThrowable.NetletRuntimeException) && this.client != null) {
                            try {
                                this.eventloop.disconnect(this.client);
                                this.client = null;
                                this.outstandingEventsCount = 0;
                            } finally {
                            }
                        }
                        transaction.rollback();
                        Sink.Status status = Sink.Status.BACKOFF;
                        transaction.close();
                        return status;
                    }
                } catch (Throwable th) {
                    transaction.close();
                    throw th;
                }
            }
        }
        return Sink.Status.READY;
    }

    private void sleep() {
        try {
            Thread.sleep(this.sleepMillis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void start() {
        try {
            if (this.storage instanceof Component) {
                this.storage.setup((Context) null);
            }
            if (this.discovery instanceof Component) {
                this.discovery.setup((Context) null);
            }
            if (this.codec instanceof Component) {
                this.codec.setup((Context) null);
            }
            this.eventloop = new DefaultEventLoop("EventLoop-" + this.id);
            this.server = new Server(this.id, this.discovery, this.acceptedTolerance);
            this.eventloop.start();
            this.eventloop.start(this.hostname, this.port, this.server);
            super.start();
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (Error e2) {
            throw e2;
        } catch (RuntimeException e3) {
            throw e3;
        }
    }

    public void stop() {
        ServiceConfigurationError serviceConfigurationError;
        try {
            super.stop();
            try {
                if (this.client != null) {
                    this.eventloop.disconnect(this.client);
                    this.client = null;
                }
                this.eventloop.stop(this.server);
                this.eventloop.stop();
                if (this.codec instanceof Component) {
                    this.codec.teardown();
                }
                if (this.discovery instanceof Component) {
                    this.discovery.teardown();
                }
                if (this.storage instanceof Component) {
                    this.storage.teardown();
                }
            } finally {
            }
        } catch (Throwable th) {
            try {
                if (this.client != null) {
                    this.eventloop.disconnect(this.client);
                    this.client = null;
                }
                this.eventloop.stop(this.server);
                this.eventloop.stop();
                if (this.codec instanceof Component) {
                    this.codec.teardown();
                }
                if (this.discovery instanceof Component) {
                    this.discovery.teardown();
                }
                if (this.storage instanceof Component) {
                    this.storage.teardown();
                }
                throw th;
            } finally {
            }
        }
    }

    public void configure(org.apache.flume.Context context) {
        this.hostname = context.getString(HOSTNAME_STRING, HOSTNAME_DEFAULT);
        this.port = context.getInteger("port", 0).intValue();
        this.id = context.getString(Storage.ID);
        if (this.id == null) {
            this.id = getName();
        }
        this.acceptedTolerance = context.getLong("acceptedTolerance", Long.valueOf(ACCEPTED_TOLERANCE)).longValue();
        this.sleepMillis = context.getLong("sleepMillis", 5L).longValue();
        this.throughputAdjustmentFactor = context.getInteger("throughputAdjustmentPercent", 5).intValue() / 100.0d;
        this.maximumEventsPerTransaction = context.getInteger("maximumEventsPerTransaction", 10000).intValue();
        this.minimumEventsPerTransaction = context.getInteger("minimumEventsPerTransaction", 100).intValue();
        this.commitEventTimeoutMillis = context.getLong("commitEventTimeoutMillis", Long.MAX_VALUE).longValue();
        Discovery<byte[]> discovery = (Discovery) configure("discovery", Discovery.class, context);
        if (discovery == null) {
            logger.warn("Discovery agent not configured for the sink!");
            this.discovery = new Discovery<byte[]>() { // from class: org.apache.apex.malhar.flume.sink.FlumeSink.1
                @Override // org.apache.apex.malhar.flume.discovery.Discovery
                public void unadvertise(Discovery.Service<byte[]> service) {
                    FlumeSink.logger.debug("Sink {} stopped listening on {}:{}", new Object[]{service.getId(), service.getHost(), Integer.valueOf(service.getPort())});
                }

                @Override // org.apache.apex.malhar.flume.discovery.Discovery
                public void advertise(Discovery.Service<byte[]> service) {
                    FlumeSink.logger.debug("Sink {} started listening on {}:{}", new Object[]{service.getId(), service.getHost(), Integer.valueOf(service.getPort())});
                }

                @Override // org.apache.apex.malhar.flume.discovery.Discovery
                public Collection<Discovery.Service<byte[]>> discover() {
                    return Collections.EMPTY_SET;
                }
            };
        } else {
            this.discovery = discovery;
        }
        this.storage = (Storage) configure("storage", Storage.class, context);
        if (this.storage == null) {
            logger.warn("storage key missing... FlumeSink may lose data!");
            this.storage = new Storage() { // from class: org.apache.apex.malhar.flume.sink.FlumeSink.2
                @Override // org.apache.apex.malhar.flume.storage.Storage
                public byte[] store(Slice slice) {
                    return null;
                }

                @Override // org.apache.apex.malhar.flume.storage.Storage
                public byte[] retrieve(byte[] bArr) {
                    return null;
                }

                @Override // org.apache.apex.malhar.flume.storage.Storage
                public byte[] retrieveNext() {
                    return null;
                }

                @Override // org.apache.apex.malhar.flume.storage.Storage
                public void clean(byte[] bArr) {
                }

                @Override // org.apache.apex.malhar.flume.storage.Storage
                public void flush() {
                }
            };
        }
        StreamCodec<Event> streamCodec = (StreamCodec) configure("codec", StreamCodec.class, context);
        if (streamCodec == null) {
            this.codec = new EventCodec();
        } else {
            this.codec = streamCodec;
        }
    }

    private static <T> T configure(String str, Class<T> cls, org.apache.flume.Context context) {
        String string = context.getString(str);
        if (string == null) {
            return null;
        }
        try {
            Class<?> loadClass = Thread.currentThread().getContextClassLoader().loadClass(string);
            if (!cls.isAssignableFrom(loadClass)) {
                logger.error("key class {} does not implement {} interface", string, Storage.class.getCanonicalName());
                throw new Error("Invalid storage " + string);
            }
            T t = (T) loadClass.newInstance();
            if (t instanceof Configurable) {
                org.apache.flume.Context context2 = new org.apache.flume.Context(context.getSubProperties(str + '.'));
                if (context2.getString(Storage.ID) == null) {
                    String string2 = context.getString(Storage.ID);
                    logger.debug("{} inherited id={} from sink", str, string2);
                    context2.put(Storage.ID, string2);
                }
                ((Configurable) t).configure(context2);
            }
            return t;
        } catch (Error e) {
            throw e;
        } catch (RuntimeException e2) {
            throw e2;
        } catch (Throwable th) {
            throw new RuntimeException(th);
        }
    }

    String getHostname() {
        return this.hostname;
    }

    void setHostname(String str) {
        this.hostname = str;
    }

    int getPort() {
        return this.port;
    }

    public long getAcceptedTolerance() {
        return this.acceptedTolerance;
    }

    public void setAcceptedTolerance(long j) {
        this.acceptedTolerance = j;
    }

    void setPort(int i) {
        this.port = i;
    }

    Discovery<byte[]> getDiscovery() {
        return this.discovery;
    }

    void setDiscovery(Discovery<byte[]> discovery) {
        this.discovery = discovery;
    }

    /* JADX WARN: Code restructure failed: missing block: B:13:0x0042, code lost:
    
        throw new java.io.IOException("Client disconnected!");
     */
    /* JADX WARN: Code restructure failed: missing block: B:15:0x0025, code lost:
    
        if (r4.client.isConnected() == false) goto L20;
     */
    /* JADX WARN: Code restructure failed: missing block: B:16:0x0028, code lost:
    
        sleep();
     */
    /* JADX WARN: Code restructure failed: missing block: B:17:0x0035, code lost:
    
        if (r4.client.write(r5, r6) == false) goto L21;
     */
    /* JADX WARN: Code restructure failed: missing block: B:19:0x0038, code lost:
    
        return;
     */
    /* JADX WARN: Code restructure failed: missing block: B:2:0x0001, code lost:
    
        if (r6 == null) goto L4;
     */
    /* JADX WARN: Code restructure failed: missing block: B:4:0x000b, code lost:
    
        if (r4.client.isConnected() == false) goto L17;
     */
    /* JADX WARN: Code restructure failed: missing block: B:5:0x000e, code lost:
    
        sleep();
     */
    /* JADX WARN: Code restructure failed: missing block: B:6:0x001a, code lost:
    
        if (r4.client.write(r5) == false) goto L18;
     */
    /* JADX WARN: Code restructure failed: missing block: B:8:0x001d, code lost:
    
        return;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void retryWrite(byte[] r5, com.datatorrent.netlet.util.Slice r6) throws java.io.IOException {
        /*
            r4 = this;
            r0 = r6
            if (r0 != 0) goto L1e
        L4:
            r0 = r4
            org.apache.apex.malhar.flume.sink.Server$Client r0 = r0.client
            boolean r0 = r0.isConnected()
            if (r0 == 0) goto L39
            r0 = r4
            r0.sleep()
            r0 = r4
            org.apache.apex.malhar.flume.sink.Server$Client r0 = r0.client
            r1 = r5
            boolean r0 = r0.write(r1)
            if (r0 == 0) goto L4
            return
        L1e:
            r0 = r4
            org.apache.apex.malhar.flume.sink.Server$Client r0 = r0.client
            boolean r0 = r0.isConnected()
            if (r0 == 0) goto L39
            r0 = r4
            r0.sleep()
            r0 = r4
            org.apache.apex.malhar.flume.sink.Server$Client r0 = r0.client
            r1 = r5
            r2 = r6
            boolean r0 = r0.write(r1, r2)
            if (r0 == 0) goto L1e
            return
        L39:
            java.io.IOException r0 = new java.io.IOException
            r1 = r0
            java.lang.String r2 = "Client disconnected!"
            r1.<init>(r2)
            throw r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.apex.malhar.flume.sink.FlumeSink.retryWrite(byte[], com.datatorrent.netlet.util.Slice):void");
    }
}
