package com.cloudera.nav.mapreduce.yarn;

import com.cloudera.cdx.extractor.model.Entity;
import com.cloudera.nav.core.model.EntityHolder;
import com.cloudera.nav.core.model.Source;
import com.cloudera.nav.core.model.SourceType;
import com.cloudera.nav.extract.UtilityIdGenerator;
import com.cloudera.nav.mapreduce.AbstractMRPoller;
import com.cloudera.nav.mapreduce.CDXMRExtractorContext;
import com.cloudera.nav.mapreduce.model.Job;
import com.cloudera.nav.mapreduce.model.JobExecution;
import com.cloudera.nav.utils.SourceNotFoundException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.util.Map;
import org.apache.commons.configuration.MapConfiguration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/nav/mapreduce/yarn/CDXYarnPoller.class */
public class CDXYarnPoller extends AbstractMRPoller<CDXYarnPollerState> {
    private static final Logger LOG = LoggerFactory.getLogger(CDXYarnPoller.class);
    private final CDXMRExtractorContext context;
    private Map<Source, String> sourceToExtractorRunId;

    public CDXYarnPoller(CDXMRExtractorContext cDXMRExtractorContext, Map<Source, String> map, String str) {
        super(cDXMRExtractorContext, CDXYarnPollerState.class, str);
        this.context = cDXMRExtractorContext;
        this.sourceToExtractorRunId = map;
    }

    @Override // com.cloudera.nav.mapreduce.AbstractMRPoller
    public CDXYarnPollerState runImpl(CDXYarnPollerState cDXYarnPollerState) {
        CDXYarnPollerState cDXYarnPollerState2 = cDXYarnPollerState == null ? new CDXYarnPollerState() : cDXYarnPollerState;
        extractMRJobs(cDXYarnPollerState2);
        return cDXYarnPollerState2;
    }

    private int pollSize() {
        return this.context.getOptions().getCDXOptions().getMaxPollSize();
    }

    private int pollTimeout() {
        return this.context.getOptions().getCDXOptions().getMaxPollTimeout();
    }

    @VisibleForTesting
    void extractMRJobs(CDXYarnPollerState cDXYarnPollerState) {
        Optional poll;
        int i = 0;
        while (true) {
            int i2 = i;
            i++;
            if (i2 >= pollSize()) {
                return;
            }
            try {
                poll = this.context.getCdxImporter().poll(pollTimeout());
            } catch (IOException e) {
                LOG.error("CDX yarn polling failed with %s", e);
                Throwables.propagate(e);
            }
            if (!poll.isPresent()) {
                return;
            }
            Entity entity = (Entity) poll.get();
            if (entity instanceof com.cloudera.cdx.extractor.model.YarnJob) {
                com.cloudera.cdx.extractor.model.YarnJob yarnJob = (com.cloudera.cdx.extractor.model.YarnJob) entity;
                LOG.trace("Got message " + yarnJob.getId());
                collectMRJob(yarnJob);
            } else {
                LOG.info("Unknown entity type returned by cdx importer for yarn polling. Skipping: " + entity.getClass().getName());
            }
        }
    }

    @VisibleForTesting
    JobExecution jobExecutionFromYarnJob(com.cloudera.cdx.extractor.model.YarnJob yarnJob) {
        JobExecution jobExecution = new JobExecution();
        jobExecution.setJobID(yarnJob.getId());
        jobExecution.setPrincipal(yarnJob.getUser());
        jobExecution.setStarted(new Instant(yarnJob.getStartTime()));
        jobExecution.setEnded(new Instant(yarnJob.getFinishTime()));
        jobExecution.setOriginalName(jobExecution.getJobID());
        jobExecution.setSourceType(SourceType.YARN);
        return jobExecution;
    }

    @VisibleForTesting
    boolean collectMRJob(com.cloudera.cdx.extractor.model.YarnJob yarnJob) {
        String nextExtractorRunId;
        try {
            MapConfiguration translateConf = translateConf(yarnJob.getJobConf());
            String sourceId = yarnJob.getSourceId();
            Source transientSourceByIdentity = this.context.getSourceManager().getTransientSourceByIdentity(sourceId);
            if (transientSourceByIdentity == null) {
                throw new SourceNotFoundException(sourceId);
            }
            if (this.sourceToExtractorRunId.containsKey(transientSourceByIdentity)) {
                nextExtractorRunId = this.sourceToExtractorRunId.get(transientSourceByIdentity);
            } else {
                nextExtractorRunId = UtilityIdGenerator.getNextExtractorRunId(transientSourceByIdentity);
                this.sourceToExtractorRunId.put(transientSourceByIdentity, nextExtractorRunId);
            }
            EntityHolder<Job> orCreateJob = getOrCreateJob(yarnJob.getName(), translateConf, transientSourceByIdentity);
            orCreateJob.getEntity().setSourceType(SourceType.YARN);
            JobExecution jobExecutionFromYarnJob = jobExecutionFromYarnJob(yarnJob);
            LOG.info("YARN CDX: Processing job id" + jobExecutionFromYarnJob.getJobID());
            extract(orCreateJob, EntityHolder.withInstance(jobExecutionFromYarnJob).build(), translateConf, transientSourceByIdentity, nextExtractorRunId);
            return true;
        } catch (RuntimeException e) {
            LOG.error("Error extracting Yarn MapReduce Job {}: Exception {}", yarnJob.getId(), e);
            return false;
        }
    }

    @VisibleForTesting
    static MapConfiguration translateConf(Map<String, String> map) {
        return new MapConfiguration(map);
    }
}
