package co.cask.cdap.gateway.collector;

import co.cask.cdap.common.discovery.EndpointStrategy;
import co.cask.cdap.common.discovery.RandomEndpointStrategy;
import co.cask.cdap.common.discovery.TimeLimitEndpointStrategy;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.CallFuture;
import org.apache.avro.ipc.Callback;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.source.avro.AvroSourceProtocol;
import org.apache.flume.source.avro.Status;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/gateway/collector/FlumeAdapter.class */
class FlumeAdapter extends AbstractIdleService implements AvroSourceProtocol.Callback {
    private static final Logger LOG = LoggerFactory.getLogger(FlumeAdapter.class);
    private final DiscoveryServiceClient discoveryClient;
    private EndpointStrategy endpointStrategy;

    @Inject
    FlumeAdapter(DiscoveryServiceClient discoveryServiceClient) {
        this.discoveryClient = discoveryServiceClient;
    }

    protected void startUp() throws Exception {
        this.endpointStrategy = new TimeLimitEndpointStrategy(new RandomEndpointStrategy(this.discoveryClient.discover("streams")), 1L, TimeUnit.SECONDS);
    }

    protected void shutDown() throws Exception {
    }

    /* JADX WARN: Finally extract failed */
    public void append(AvroFlumeEvent avroFlumeEvent, Callback<Status> callback) throws IOException {
        try {
            Discoverable pick = this.endpointStrategy.pick();
            if (pick == null) {
                callback.handleError(new IllegalStateException("No stream endpoint available. Unable to write to stream."));
                return;
            }
            TreeMap newTreeMap = Maps.newTreeMap();
            String createHeaders = createHeaders(avroFlumeEvent, newTreeMap);
            HttpURLConnection httpURLConnection = (HttpURLConnection) new URL(String.format("http://%s:%d/v2/streams/%s", pick.getSocketAddress().getHostName(), Integer.valueOf(pick.getSocketAddress().getPort()), createHeaders)).openConnection();
            try {
                httpURLConnection.setDoOutput(true);
                for (Map.Entry<String, String> entry : newTreeMap.entrySet()) {
                    String key = entry.getKey();
                    if (!"X-ApiKey".equals(key) && !"X-Destination".equals(key)) {
                        key = createHeaders + "." + key;
                    }
                    httpURLConnection.setRequestProperty(key, entry.getValue());
                }
                WritableByteChannel newChannel = Channels.newChannel(httpURLConnection.getOutputStream());
                try {
                    ByteBuffer duplicate = avroFlumeEvent.getBody().duplicate();
                    while (duplicate.hasRemaining()) {
                        newChannel.write(duplicate);
                    }
                    newChannel.close();
                    int responseCode = httpURLConnection.getResponseCode();
                    Preconditions.checkState(responseCode == 200, "Status != 200 OK (%s)", new Object[]{Integer.valueOf(responseCode)});
                    callback.handleResult(Status.OK);
                    httpURLConnection.disconnect();
                } catch (Throwable th) {
                    newChannel.close();
                    throw th;
                }
            } catch (Throwable th2) {
                httpURLConnection.disconnect();
                throw th2;
            }
        } catch (Exception e) {
            LOG.error("Error consuming single event", e);
            callback.handleError(e);
        }
    }

    public void appendBatch(List<AvroFlumeEvent> list, Callback<Status> callback) throws IOException {
        final AtomicReference atomicReference = new AtomicReference();
        Callback<Status> callback2 = new Callback<Status>() { // from class: co.cask.cdap.gateway.collector.FlumeAdapter.1
            public void handleResult(Status status) {
            }

            public void handleError(Throwable th) {
                atomicReference.compareAndSet(null, th);
            }
        };
        Iterator<AvroFlumeEvent> it = list.iterator();
        while (it.hasNext()) {
            append(it.next(), callback2);
        }
        Throwable th = (Throwable) atomicReference.get();
        if (th == null) {
            callback.handleResult(Status.OK);
        } else {
            callback.handleError(th);
        }
    }

    public Status append(AvroFlumeEvent avroFlumeEvent) throws AvroRemoteException {
        CallFuture callFuture = new CallFuture();
        try {
            append(avroFlumeEvent, callFuture);
            return (Status) callFuture.get();
        } catch (Exception e) {
            throw new AvroRemoteException(e);
        }
    }

    public Status appendBatch(List<AvroFlumeEvent> list) throws AvroRemoteException {
        CallFuture callFuture = new CallFuture();
        try {
            appendBatch(list, callFuture);
            return (Status) callFuture.get();
        } catch (Exception e) {
            throw new AvroRemoteException(e);
        }
    }

    private String createHeaders(AvroFlumeEvent avroFlumeEvent, Map<String, String> map) {
        String str = null;
        for (Map.Entry entry : avroFlumeEvent.getHeaders().entrySet()) {
            String obj = ((CharSequence) entry.getKey()).toString();
            String obj2 = ((CharSequence) entry.getValue()).toString();
            map.put(obj, obj2);
            if ("X-Destination".equals(obj)) {
                str = obj2;
            }
        }
        if (str == null) {
            throw new IllegalArgumentException("Missing header 'X-Destination'");
        }
        return str;
    }
}
