package org.apache.streams.youtube.provider;

import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.youtube.YouTube;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.google.gplus.configuration.UserInfo;
import org.apache.streams.util.ComponentUtils;
import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
import org.apache.streams.youtube.YoutubeConfiguration;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/youtube/provider/YoutubeProvider.class */
public abstract class YoutubeProvider implements StreamsProvider {
    private static final String STREAMS_ID = "YoutubeProvider";
    private static final int MAX_BATCH_SIZE = 1000;
    private static final int DEFAULT_THREAD_POOL_SIZE = 5;
    protected YouTube youtube;
    protected YoutubeConfiguration config;
    private List<String> scopes;
    private List<ListenableFuture<Object>> futures;
    private ListeningExecutorService executor;
    private BlockingQueue<StreamsDatum> datumQueue;
    private AtomicBoolean isComplete;
    private boolean previousPullWasEmpty;
    private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeProvider.class);
    private static final HttpTransport HTTP_TRANSPORT = new NetHttpTransport();
    private static final JsonFactory JSON_FACTORY = new JacksonFactory();

    public YoutubeProvider() {
        this.scopes = Collections.singletonList("https://www.googleapis.com/auth/youtube");
        this.futures = new ArrayList();
        this.config = (YoutubeConfiguration) new ComponentConfigurator(YoutubeConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("youtube"));
        Objects.requireNonNull(this.config.getApiKey());
    }

    public YoutubeProvider(YoutubeConfiguration youtubeConfiguration) {
        this.scopes = Collections.singletonList("https://www.googleapis.com/auth/youtube");
        this.futures = new ArrayList();
        this.config = youtubeConfiguration;
        Objects.requireNonNull(this.config.getApiKey());
    }

    public String getId() {
        return STREAMS_ID;
    }

    public void prepare(Object obj) {
        try {
            this.youtube = createYouTubeClient();
            this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE));
            this.datumQueue = new LinkedBlockingQueue(MAX_BATCH_SIZE);
            this.isComplete = new AtomicBoolean(false);
            this.previousPullWasEmpty = false;
        } catch (IOException | GeneralSecurityException e) {
            LOGGER.error("Failed to created oauth for YouTube : {}", e);
            throw new RuntimeException(e);
        }
    }

    public void startStream() {
        ExponentialBackOffStrategy exponentialBackOffStrategy = new ExponentialBackOffStrategy(2);
        for (UserInfo userInfo : this.config.getYoutubeUsers()) {
            if (this.config.getDefaultAfterDate() != null && userInfo.getAfterDate() == null) {
                userInfo.setAfterDate(this.config.getDefaultAfterDate());
            }
            if (this.config.getDefaultBeforeDate() != null && userInfo.getBeforeDate() == null) {
                userInfo.setBeforeDate(this.config.getDefaultBeforeDate());
            }
            this.futures.add(this.executor.submit(getDataCollector(exponentialBackOffStrategy, this.datumQueue, this.youtube, userInfo)));
        }
        this.executor.shutdown();
    }

    protected abstract Runnable getDataCollector(BackOffStrategy backOffStrategy, BlockingQueue<StreamsDatum> blockingQueue, YouTube youTube, UserInfo userInfo);

    public StreamsResultSet readCurrent() {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        int i = 0;
        while (!this.datumQueue.isEmpty() && i < MAX_BATCH_SIZE) {
            StreamsDatum streamsDatum = (StreamsDatum) ComponentUtils.pollWhileNotEmpty(this.datumQueue);
            if (streamsDatum != null) {
                i++;
                ComponentUtils.offerUntilSuccess(streamsDatum, linkedBlockingQueue);
            }
        }
        return new StreamsResultSet(linkedBlockingQueue);
    }

    public StreamsResultSet readNew(BigInteger bigInteger) {
        return null;
    }

    public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
        return null;
    }

    @VisibleForTesting
    protected YouTube createYouTubeClient() throws IOException, GeneralSecurityException {
        GoogleCredential.Builder serviceAccountScopes = new GoogleCredential.Builder().setTransport(HTTP_TRANSPORT).setJsonFactory(JSON_FACTORY).setServiceAccountId(getConfig().getOauth().getServiceAccountEmailAddress()).setServiceAccountScopes(this.scopes);
        if (StringUtils.isNotEmpty(getConfig().getOauth().getPathToP12KeyFile())) {
            File file = new File(getConfig().getOauth().getPathToP12KeyFile());
            if (file.exists() && file.isFile() && file.canRead()) {
                serviceAccountScopes = serviceAccountScopes.setServiceAccountPrivateKeyFromP12File(file);
            }
        }
        return new YouTube.Builder(HTTP_TRANSPORT, JSON_FACTORY, serviceAccountScopes.build()).setApplicationName("Streams Application").build();
    }

    public void cleanUp() {
        ComponentUtils.shutdownExecutor(this.executor, 10, 10);
        this.executor = null;
    }

    public YoutubeConfiguration getConfig() {
        return this.config;
    }

    public void setConfig(YoutubeConfiguration youtubeConfiguration) {
        this.config = youtubeConfiguration;
    }

    public void setDefaultBeforeDate(DateTime dateTime) {
        this.config.setDefaultBeforeDate(dateTime);
    }

    public void setDefaultAfterDate(DateTime dateTime) {
        this.config.setDefaultAfterDate(dateTime);
    }

    public void setUserInfoWithDefaultDates(Set<String> set) {
        LinkedList linkedList = new LinkedList();
        for (String str : set) {
            UserInfo userInfo = new UserInfo();
            userInfo.setUserId(str);
            userInfo.setAfterDate(this.config.getDefaultAfterDate());
            userInfo.setBeforeDate(this.config.getDefaultBeforeDate());
            linkedList.add(userInfo);
        }
        this.config.setYoutubeUsers(linkedList);
    }

    public void setUserInfoWithAfterDate(Map<String, DateTime> map) {
        LinkedList linkedList = new LinkedList();
        for (String str : map.keySet()) {
            UserInfo userInfo = new UserInfo();
            userInfo.setUserId(str);
            userInfo.setAfterDate(map.get(str));
            linkedList.add(userInfo);
        }
        this.config.setYoutubeUsers(linkedList);
    }

    public boolean isRunning() {
        if (this.datumQueue.isEmpty() && this.executor.isTerminated() && Futures.allAsList(this.futures).isDone()) {
            LOGGER.info("Completed");
            this.isComplete.set(true);
            LOGGER.info("Exiting");
        }
        return !this.isComplete.get();
    }
}
