package org.apache.apex.malhar.flume.operator;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.netlet.AbstractLengthPrependerClient;
import com.datatorrent.netlet.DefaultEventLoop;
import com.datatorrent.netlet.util.Slice;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.flume.discovery.Discovery;
import org.apache.apex.malhar.flume.discovery.ZKAssistedDiscovery;
import org.apache.apex.malhar.flume.sink.Server;
import org.apache.flume.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.class */
public abstract class AbstractFlumeInputOperator<T> implements InputOperator, Operator.ActivationListener<Context.OperatorContext>, Operator.IdleTimeHandler, Operator.CheckpointListener, Partitioner<AbstractFlumeInputOperator<T>> {

    @NotNull
    private StreamCodec<Event> codec;
    private transient int idleCounter;
    private transient int eventCounter;
    private transient DefaultEventLoop eventloop;
    private volatile transient boolean connected;
    private transient Context.OperatorContext context;
    private transient AbstractFlumeInputOperator<T>.Client client;
    private transient long windowId;
    private transient byte[] address;
    private transient long maxEventsPerWindow;
    private static final transient ThreadLocal<HashMap<Integer, ConnectionStatus>> partitionedInstanceStatus = new ThreadLocal<HashMap<Integer, ConnectionStatus>>() { // from class: org.apache.apex.malhar.flume.operator.AbstractFlumeInputOperator.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public HashMap<Integer, ConnectionStatus> initialValue() {
            return new HashMap<>();
        }
    };
    private static final transient ThreadLocal<HashMap<String, ArrayList<RecoveryAddress>>> abandonedRecoveryAddresses = new ThreadLocal<HashMap<String, ArrayList<RecoveryAddress>>>() { // from class: org.apache.apex.malhar.flume.operator.AbstractFlumeInputOperator.2
        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public HashMap<String, ArrayList<RecoveryAddress>> initialValue() {
            return new HashMap<>();
        }
    };
    protected static final transient ThreadLocal<Collection<Discovery.Service<byte[]>>> discoveredFlumeSinks = new ThreadLocal<>();
    private static final Logger logger = LoggerFactory.getLogger(AbstractFlumeInputOperator.class);
    public final transient DefaultOutputPort<T> output = new DefaultOutputPort<>();
    public final transient DefaultOutputPort<Slice> drop = new DefaultOutputPort<>();
    private transient ArrayBlockingQueue<Slice> handoverBuffer = new ArrayBlockingQueue<>(5120);

    @NotNull
    private String[] connectionSpecs = new String[0];
    private final ArrayList<RecoveryAddress> recoveryAddresses = new ArrayList<>();

    @Min(0)
    private long maxEventsPerSecond = Long.MAX_VALUE;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator$Client.class */
    public class Client extends AbstractLengthPrependerClient {
        private final String id;

        Client(String str) {
            this.id = str;
        }

        public void onMessage(byte[] bArr, int i, int i2) {
            try {
                AbstractFlumeInputOperator.this.handoverBuffer.put(new Slice(bArr, i, i2));
            } catch (InterruptedException e) {
                handleException(e, AbstractFlumeInputOperator.this.eventloop);
            }
        }

        public void connected() {
            byte[] bArr;
            super.connected();
            synchronized (AbstractFlumeInputOperator.this.recoveryAddresses) {
                bArr = AbstractFlumeInputOperator.this.recoveryAddresses.size() > 0 ? ((RecoveryAddress) AbstractFlumeInputOperator.this.recoveryAddresses.get(AbstractFlumeInputOperator.this.recoveryAddresses.size() - 1)).address : new byte[8];
            }
            byte[] bArr2 = new byte[17];
            bArr2[0] = Server.Command.SEEK.getOrdinal();
            System.arraycopy(bArr, 0, bArr2, 1, 8);
            Server.writeLong(bArr2, 9, System.currentTimeMillis());
            write(bArr2);
            AbstractFlumeInputOperator.this.connected = true;
            ConnectionStatus connectionStatus = new ConnectionStatus();
            connectionStatus.connected = true;
            connectionStatus.spec = AbstractFlumeInputOperator.this.connectionSpecs[0];
            synchronized (AbstractFlumeInputOperator.this.context) {
                AbstractFlumeInputOperator.logger.debug("{} Submitting ConnectionStatus = {}", AbstractFlumeInputOperator.this, connectionStatus);
                AbstractFlumeInputOperator.this.context.setCounters(connectionStatus);
            }
        }

        public void disconnected() {
            AbstractFlumeInputOperator.this.connected = false;
            ConnectionStatus connectionStatus = new ConnectionStatus();
            connectionStatus.connected = false;
            connectionStatus.spec = AbstractFlumeInputOperator.this.connectionSpecs[0];
            synchronized (AbstractFlumeInputOperator.this.context) {
                AbstractFlumeInputOperator.logger.debug("{} Submitting ConnectionStatus = {}", AbstractFlumeInputOperator.this, connectionStatus);
                AbstractFlumeInputOperator.this.context.setCounters(connectionStatus);
            }
            super.disconnected();
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator$ConnectionStatus.class */
    public static class ConnectionStatus implements Serializable {
        int id;
        String spec;
        boolean connected;
        private static final long serialVersionUID = 201312261615L;

        public int hashCode() {
            return this.spec.hashCode();
        }

        public boolean equals(Object obj) {
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            ConnectionStatus connectionStatus = (ConnectionStatus) obj;
            return this.spec == null ? connectionStatus.spec == null : this.spec.equals(connectionStatus.spec);
        }

        public String toString() {
            return "ConnectionStatus{id=" + this.id + ", spec=" + this.spec + ", connected=" + this.connected + '}';
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator$RecoveryAddress.class */
    private static class RecoveryAddress implements Serializable {
        long windowId;
        byte[] address;
        private static final long serialVersionUID = 201312021432L;

        private RecoveryAddress() {
        }

        public String toString() {
            return "RecoveryAddress{windowId=" + this.windowId + ", address=" + Arrays.toString(this.address) + '}';
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!(obj instanceof RecoveryAddress)) {
                return false;
            }
            RecoveryAddress recoveryAddress = (RecoveryAddress) obj;
            if (this.windowId != recoveryAddress.windowId) {
                return false;
            }
            return Arrays.equals(this.address, recoveryAddress.address);
        }

        public int hashCode() {
            return (31 * ((int) (this.windowId ^ (this.windowId >>> 32)))) + (this.address != null ? Arrays.hashCode(this.address) : 0);
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator$ZKStatsListner.class */
    public static class ZKStatsListner extends ZKAssistedDiscovery implements StatsListener, Serializable {
        long intervalMillis = 60000;
        private final StatsListener.Response response = new StatsListener.Response();
        private transient long nextMillis;
        private static final long serialVersionUID = 201312241646L;

        public StatsListener.Response processStats(StatsListener.BatchedOperatorStats batchedOperatorStats) {
            HashMap hashMap = (HashMap) AbstractFlumeInputOperator.partitionedInstanceStatus.get();
            this.response.repartitionRequired = false;
            Object obj = null;
            for (Stats.OperatorStats operatorStats : batchedOperatorStats.getLastWindowedStats()) {
                if (operatorStats.counters != null) {
                    obj = operatorStats.counters;
                    AbstractFlumeInputOperator.logger.debug("Received custom stats = {}", obj);
                }
            }
            if (obj instanceof ConnectionStatus) {
                ConnectionStatus connectionStatus = (ConnectionStatus) obj;
                hashMap.put(Integer.valueOf(batchedOperatorStats.getOperatorId()), connectionStatus);
                if (!connectionStatus.connected) {
                    AbstractFlumeInputOperator.logger.debug("setting repatitioned = true because of lastStat = {}", obj);
                    this.response.repartitionRequired = true;
                }
            }
            if (System.currentTimeMillis() >= this.nextMillis) {
                AbstractFlumeInputOperator.logger.debug("nextMillis = {}", Long.valueOf(this.nextMillis));
                try {
                    try {
                        super.setup(null);
                        try {
                            Collection<Discovery.Service<byte[]>> discover = discover();
                            super.teardown();
                            AbstractFlumeInputOperator.discoveredFlumeSinks.set(discover);
                            AbstractFlumeInputOperator.logger.debug("\ncurrent map = {}\ndiscovered sinks = {}", hashMap, discover);
                            switch (discover.size()) {
                                case 0:
                                    this.response.repartitionRequired = hashMap.size() != 1;
                                    break;
                                default:
                                    if (discover.size() != hashMap.size()) {
                                        this.response.repartitionRequired = true;
                                        break;
                                    } else {
                                        for (ConnectionStatus connectionStatus2 : hashMap.values()) {
                                            if (connectionStatus2 == null || !connectionStatus2.connected) {
                                                this.response.repartitionRequired = true;
                                                break;
                                            }
                                        }
                                    }
                                    break;
                            }
                            this.nextMillis = System.currentTimeMillis() + this.intervalMillis;
                            AbstractFlumeInputOperator.logger.debug("Proposed NextMillis = {}", Long.valueOf(this.nextMillis));
                        } catch (Throwable th) {
                            super.teardown();
                            throw th;
                        }
                    } catch (Throwable th2) {
                        this.nextMillis = System.currentTimeMillis() + this.intervalMillis;
                        AbstractFlumeInputOperator.logger.debug("Proposed NextMillis = {}", Long.valueOf(this.nextMillis));
                        throw th2;
                    }
                } catch (Error e) {
                    throw e;
                } catch (Throwable th3) {
                    AbstractFlumeInputOperator.logger.warn("Unable to discover services, using values from last successful discovery", th3);
                    this.nextMillis = System.currentTimeMillis() + this.intervalMillis;
                    AbstractFlumeInputOperator.logger.debug("Proposed NextMillis = {}", Long.valueOf(this.nextMillis));
                }
            }
            return this.response;
        }

        public long getIntervalMillis() {
            return this.intervalMillis;
        }

        public void setIntervalMillis(long j) {
            this.intervalMillis = j;
        }
    }

    public void setup(Context.OperatorContext operatorContext) {
        this.maxEventsPerWindow = (long) (((((Integer) operatorContext.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT)).intValue() * ((Integer) operatorContext.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS)).intValue()) / 1000.0d) * this.maxEventsPerSecond);
        logger.debug("max-events per-second {} per-window {}", Long.valueOf(this.maxEventsPerSecond), Long.valueOf(this.maxEventsPerWindow));
        try {
            this.eventloop = new DefaultEventLoop("EventLoop-" + operatorContext.getId());
            this.eventloop.start();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void activate(Context.OperatorContext operatorContext) {
        if (this.connectionSpecs.length == 0) {
            logger.info("Discovered zero FlumeSink");
        } else {
            if (this.connectionSpecs.length != 1) {
                throw new IllegalArgumentException(String.format("A physical %s operator cannot connect to more than 1 addresses!", getClass().getSimpleName()));
            }
            for (String str : this.connectionSpecs) {
                logger.debug("Connection spec is {}", str);
                String[] split = str.split(":");
                DefaultEventLoop defaultEventLoop = this.eventloop;
                InetSocketAddress inetSocketAddress = new InetSocketAddress(split[1], Integer.parseInt(split[2]));
                AbstractFlumeInputOperator<T>.Client client = new Client(split[0]);
                this.client = client;
                defaultEventLoop.connect(inetSocketAddress, client);
            }
        }
        this.context = operatorContext;
    }

    public void beginWindow(long j) {
        this.windowId = j;
        this.idleCounter = 0;
        this.eventCounter = 0;
    }

    public void emitTuples() {
        int size = this.handoverBuffer.size();
        if (size <= 0 || this.eventCounter >= this.maxEventsPerWindow) {
            return;
        }
        while (true) {
            size--;
            if (size <= 0 || this.eventCounter >= this.maxEventsPerWindow - 1) {
                break;
            }
            Slice poll = this.handoverBuffer.poll();
            poll.offset += 8;
            poll.length -= 8;
            T convert = convert((Event) this.codec.fromByteArray(poll));
            if (convert == null) {
                this.drop.emit(poll);
            } else {
                this.output.emit(convert);
            }
            this.eventCounter++;
        }
        Slice poll2 = this.handoverBuffer.poll();
        poll2.offset += 8;
        poll2.length -= 8;
        T convert2 = convert((Event) this.codec.fromByteArray(poll2));
        if (convert2 == null) {
            this.drop.emit(poll2);
        } else {
            this.output.emit(convert2);
        }
        this.eventCounter++;
        this.address = Arrays.copyOfRange(poll2.buffer, poll2.offset - 8, poll2.offset);
    }

    public void endWindow() {
        if (this.connected) {
            byte[] bArr = new byte[17];
            bArr[0] = Server.Command.WINDOWED.getOrdinal();
            Server.writeInt(bArr, 1, this.eventCounter);
            Server.writeInt(bArr, 5, this.idleCounter);
            Server.writeLong(bArr, 9, System.currentTimeMillis());
            logger.debug("wrote {} with eventCounter = {} and idleCounter = {}", new Object[]{Server.Command.WINDOWED, Integer.valueOf(this.eventCounter), Integer.valueOf(this.idleCounter)});
            this.client.write(bArr);
        }
        if (this.address != null) {
            RecoveryAddress recoveryAddress = new RecoveryAddress();
            recoveryAddress.address = this.address;
            this.address = null;
            recoveryAddress.windowId = this.windowId;
            this.recoveryAddresses.add(recoveryAddress);
        }
    }

    public void deactivate() {
        if (this.connected) {
            this.eventloop.disconnect(this.client);
        }
        this.context = null;
    }

    public void teardown() {
        this.eventloop.stop();
        this.eventloop = null;
    }

    public void handleIdleTime() {
        this.idleCounter++;
        try {
            Thread.sleep(((Integer) this.context.getValue(Context.OperatorContext.SPIN_MILLIS)).intValue());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public abstract T convert(Event event);

    public String[] getConnectAddresses() {
        return (String[]) this.connectionSpecs.clone();
    }

    public void setConnectAddresses(String[] strArr) {
        this.connectionSpecs = (String[]) strArr.clone();
    }

    public StreamCodec<Event> getCodec() {
        return this.codec;
    }

    public void setCodec(StreamCodec<Event> streamCodec) {
        this.codec = streamCodec;
    }

    public void checkpointed(long j) {
    }

    public void committed(long j) {
        if (this.connected) {
            synchronized (this.recoveryAddresses) {
                byte[] bArr = null;
                Iterator<RecoveryAddress> it = this.recoveryAddresses.iterator();
                while (it.hasNext()) {
                    RecoveryAddress next = it.next();
                    if (next.windowId > j) {
                        break;
                    }
                    it.remove();
                    if (next.address != null) {
                        bArr = next.address;
                    }
                }
                if (bArr != null) {
                    if (this.recoveryAddresses.isEmpty()) {
                        RecoveryAddress recoveryAddress = new RecoveryAddress();
                        recoveryAddress.address = bArr;
                        this.recoveryAddresses.add(recoveryAddress);
                    }
                    byte[] bArr2 = new byte[17];
                    bArr2[0] = Server.Command.COMMITTED.getOrdinal();
                    System.arraycopy(bArr, 0, bArr2, 1, 8);
                    Server.writeLong(bArr2, 9, System.currentTimeMillis());
                    logger.debug("wrote {} with recoveryOffset = {}", Server.Command.COMMITTED, Arrays.toString(bArr));
                    this.client.write(bArr2);
                }
            }
        }
    }

    public Collection<Partitioner.Partition<AbstractFlumeInputOperator<T>>> definePartitions(Collection<Partitioner.Partition<AbstractFlumeInputOperator<T>>> collection, Partitioner.PartitioningContext partitioningContext) {
        Collection<Discovery.Service<byte[]>> collection2 = discoveredFlumeSinks.get();
        if (collection2 == null) {
            return collection;
        }
        HashMap<String, ArrayList<RecoveryAddress>> hashMap = abandonedRecoveryAddresses.get();
        ArrayList arrayList = new ArrayList(collection.size());
        for (Partitioner.Partition<AbstractFlumeInputOperator<T>> partition : collection) {
            String[] strArr = ((AbstractFlumeInputOperator) partition.getPartitionedInstance()).connectionSpecs;
            arrayList.addAll(Arrays.asList(strArr));
            int length = strArr.length;
            while (true) {
                int i = length;
                length--;
                if (i > 0) {
                    hashMap.put(strArr[length].split(":", 2)[0], ((AbstractFlumeInputOperator) partition.getPartitionedInstance()).recoveryAddresses);
                }
            }
        }
        HashMap hashMap2 = new HashMap(collection2.size());
        for (Discovery.Service<byte[]> service : collection2) {
            String str = (String) hashMap2.get(service.getId());
            String str2 = service.getId() + ':' + service.getHost() + ':' + service.getPort();
            if (str == null) {
                hashMap2.put(service.getId(), str2);
            } else {
                boolean z = false;
                Iterator<ConnectionStatus> it = partitionedInstanceStatus.get().values().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    ConnectionStatus next = it.next();
                    if (str.equals(next.spec) && !next.connected) {
                        hashMap2.put(service.getId(), str2);
                        z = true;
                        break;
                    }
                }
                if (!z) {
                    logger.warn("2 sinks found with the same id: {} and {}... Ignoring previous.", str, str2);
                    hashMap2.put(service.getId(), str2);
                }
            }
        }
        int size = arrayList.size();
        while (true) {
            int i2 = size;
            size--;
            if (i2 <= 0) {
                break;
            }
            String str3 = (String) hashMap2.remove(((String) arrayList.get(size)).split(":")[0]);
            if (str3 == null) {
                arrayList.remove(size);
            } else {
                arrayList.set(size, str3);
            }
        }
        arrayList.addAll(hashMap2.values());
        collection.clear();
        try {
            if (!arrayList.isEmpty()) {
                long size2 = this.maxEventsPerSecond / arrayList.size();
                int size3 = arrayList.size();
                while (true) {
                    int i3 = size3;
                    size3--;
                    if (i3 <= 0) {
                        break;
                    }
                    AbstractFlumeInputOperator abstractFlumeInputOperator = (AbstractFlumeInputOperator) getClass().newInstance();
                    abstractFlumeInputOperator.setCodec(this.codec);
                    abstractFlumeInputOperator.setMaxEventsPerSecond(size2);
                    String str4 = (String) arrayList.get(size3);
                    abstractFlumeInputOperator.connectionSpecs = new String[]{str4};
                    ArrayList<RecoveryAddress> remove = hashMap.remove(str4.split(":", 2)[0]);
                    if (remove != null) {
                        abstractFlumeInputOperator.recoveryAddresses.addAll(remove);
                    }
                    collection.add(new DefaultPartition(abstractFlumeInputOperator));
                }
            } else {
                AbstractFlumeInputOperator abstractFlumeInputOperator2 = (AbstractFlumeInputOperator) getClass().newInstance();
                abstractFlumeInputOperator2.setCodec(this.codec);
                abstractFlumeInputOperator2.setMaxEventsPerSecond(this.maxEventsPerSecond);
                Iterator<ArrayList<RecoveryAddress>> it2 = hashMap.values().iterator();
                while (it2.hasNext()) {
                    abstractFlumeInputOperator2.recoveryAddresses.addAll(it2.next());
                }
                abstractFlumeInputOperator2.connectionSpecs = new String[arrayList.size()];
                int length2 = this.connectionSpecs.length;
                while (true) {
                    int i4 = length2;
                    length2--;
                    if (i4 <= 0) {
                        break;
                    }
                    this.connectionSpecs[length2] = (String) arrayList.get(length2);
                }
                collection.add(new DefaultPartition(abstractFlumeInputOperator2));
            }
            logger.debug("Requesting partitions: {}", collection);
            return collection;
        } catch (IllegalAccessException e) {
            throw new RuntimeException(e);
        } catch (InstantiationException e2) {
            throw new RuntimeException(e2);
        }
    }

    public void partitioned(Map<Integer, Partitioner.Partition<AbstractFlumeInputOperator<T>>> map) {
        logger.debug("Partitioned Map: {}", map);
        HashMap<Integer, ConnectionStatus> hashMap = partitionedInstanceStatus.get();
        hashMap.clear();
        for (Map.Entry<Integer, Partitioner.Partition<AbstractFlumeInputOperator<T>>> entry : map.entrySet()) {
            if (!hashMap.containsKey(entry.getKey())) {
                hashMap.put(entry.getKey(), null);
            }
        }
    }

    public String toString() {
        return "AbstractFlumeInputOperator{connected=" + this.connected + ", connectionSpecs=" + (this.connectionSpecs.length == 0 ? "empty" : this.connectionSpecs[0]) + ", recoveryAddresses=" + this.recoveryAddresses + '}';
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (!(obj instanceof AbstractFlumeInputOperator)) {
            return false;
        }
        AbstractFlumeInputOperator abstractFlumeInputOperator = (AbstractFlumeInputOperator) obj;
        if (Arrays.equals(this.connectionSpecs, abstractFlumeInputOperator.connectionSpecs)) {
            return this.recoveryAddresses.equals(abstractFlumeInputOperator.recoveryAddresses);
        }
        return false;
    }

    public int hashCode() {
        return (31 * (this.connectionSpecs != null ? Arrays.hashCode(this.connectionSpecs) : 0)) + this.recoveryAddresses.hashCode();
    }

    public void setMaxEventsPerSecond(long j) {
        this.maxEventsPerSecond = j;
    }

    public long getMaxEventsPerSecond() {
        return this.maxEventsPerSecond;
    }
}
