package com.cloudera.cdx.extractor.spark;

import com.cloudera.cdx.extractor.ServiceExtractionTask;
import com.cloudera.cdx.extractor.model.TelemetryPublisherCountersMap;
import com.cloudera.cdx.extractor.spark.SafariAnalysisPollerState;
import com.cloudera.cdx.extractor.util.ExtractorUtil;
import com.cloudera.cmf.cdhclient.util.ThrottlingLogger;
import com.cloudera.cmf.version.VersionString;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Iterator;
import java.util.List;
import java.util.SimpleTimeZone;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.cxf.jaxrs.client.WebClient;
import org.apache.cxf.transport.http.HTTPConduit;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/cdx/extractor/spark/SafariSparkPoller.class */
public class SafariSparkPoller implements ServiceExtractionTask {
    private static final int DES_WAIT_TIME_IN_SECONDS = 30;
    private static final String IN_PROGRESS_EVENTS = ".in_progress_events";
    public static final String SAFARI_SUFFIX = "safari";
    private static final String MANIFEST_FILE_NAME = "safariManifestFile";
    private static final long MAX_SIZE_ALLOWED_FOR_WXM = 0;
    public static final String SPARK_SERVICE_NAME = "sparkServiceName";
    public static final String SPARK_APPLICATION_ID = "applicationId";
    public static final String COMPLETED = "completed";
    private static final String SUB_DIR = "safariSparkPoller/";
    private final SparkExtractorContext context;
    private final SafariVerificationHelper safariVerificationHelper;
    private static final Logger LOG = LoggerFactory.getLogger(SafariSparkPoller.class);
    private static final Logger THROTTLED_LOG = new ThrottlingLogger(LOG, Duration.standardMinutes(15));
    private static final VersionString MIN_CDH_VERSION_SUPPORTED = VersionString.of("6.0");
    private static TelemetryPublisherCountersMap tpCounters = TelemetryPublisherCountersMap.getInstance();

    @VisibleForTesting
    static final SimpleDateFormat DATE_FORMATTER = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'GMT'");

    public SafariSparkPoller(SparkExtractorContext sparkExtractorContext, SafariVerificationHelper safariVerificationHelper) {
        this.context = (SparkExtractorContext) Preconditions.checkNotNull(sparkExtractorContext);
        this.safariVerificationHelper = safariVerificationHelper;
    }

    @VisibleForTesting
    Instant getStartTimeForPoller(SafariAnalysisPollerState safariAnalysisPollerState, Instant instant) {
        return new Instant(Math.min(instant.minus(this.context.getOptions().getShsPollerLookbackPeriod()).getMillis(), Math.max(instant.minus(this.context.getOptions().getShsPollerMaxLookbackPeriod()).getMillis(), safariAnalysisPollerState.getLastFinishedTime())));
    }

    /* renamed from: run, reason: merged with bridge method [inline-methods] */
    public SafariAnalysisPollerState m3run() {
        return runImpl(Instant.now());
    }

    @VisibleForTesting
    SafariAnalysisPollerState runImpl(Instant instant) {
        SafariAnalysisPollerState loadState = loadState();
        SparkHistoryClient sparkHistoryClient = null;
        try {
            sparkHistoryClient = createShsClient();
            List<SparkApplication> fetchNewSparkApplications = fetchNewSparkApplications(loadState, instant, sparkHistoryClient);
            LOG.info("There are {} spark applications received from Spark History Server for spark service {}.", Integer.valueOf(fetchNewSparkApplications.size()), this.context.getService().getName());
            LOG.info("There are {} new spark applications that are not in local state.", Long.valueOf(processNewSparkApplications(loadState, fetchNewSparkApplications)));
            publishAnalysisFilestoWXM(getSafariAnalysisCompletedApplications(loadState));
            deleteFilesOlderThanStateRetentionPeriod(loadState);
            this.context.getEventLogExporter().close(this.context.getOptions().getExporterShutdownSeconds(), TimeUnit.SECONDS);
            if (sparkHistoryClient != null) {
                closeClient(sparkHistoryClient);
            }
            return loadState;
        } catch (Throwable th) {
            this.context.getEventLogExporter().close(this.context.getOptions().getExporterShutdownSeconds(), TimeUnit.SECONDS);
            if (sparkHistoryClient != null) {
                closeClient(sparkHistoryClient);
            }
            throw th;
        }
    }

    @VisibleForTesting
    void deleteFilesOlderThanStateRetentionPeriod(SafariAnalysisPollerState safariAnalysisPollerState) {
        Iterator<SafariAnalysisPollerState.SafariAppState> it = safariAnalysisPollerState.getLastExportedApps().iterator();
        long currentTimeMillis = System.currentTimeMillis();
        while (it.hasNext()) {
            SafariAnalysisPollerState.SafariAppState next = it.next();
            if (currentTimeMillis - next.getStateCreationTime() > this.context.getOptions().getSafariStateRetentionPeriod()) {
                it.remove();
                if (LOG.isDebugEnabled()) {
                    LOG.info("Deleting the app state for {} with creation time of {}  from safari poller state.", next.getAppId(), Long.valueOf(next.getStateCreationTime()));
                }
            }
        }
        saveState(safariAnalysisPollerState);
    }

    private void publishAnalysisFilestoWXM(List<SafariAnalysisPollerState.SafariAppState> list) {
        if (list.isEmpty()) {
            LOG.info("There are no spark applications that have safari analysis completed on them.");
            return;
        }
        LOG.info("There are {} spark applications with safari analysis completed on them.", Integer.valueOf(list.size()));
        Iterator<SafariAnalysisPollerState.SafariAppState> it = list.iterator();
        while (it.hasNext()) {
            publishAnalysisFiles(it.next());
        }
    }

    private void publishAnalysisFiles(SafariAnalysisPollerState.SafariAppState safariAppState) {
        try {
            File file = new File(new File(this.context.getOptions().getStorageDirectory(), SUB_DIR), "desAnalysis_" + safariAppState.getAppId());
            this.safariVerificationHelper.fetchAnalysisFiles(safariAppState.getAppId(), file);
            generateManifestFile(safariAppState, file);
            publishTarBallToWXM(safariAppState, generateArchiveAndReturnArchiveFile(safariAppState, file));
        } catch (Throwable th) {
            safariAppState.setSafariState(SafariAnalysisPollerState.SafariState.SAFARI_ENABLED_ANALYSIS_COMPLETED_ERROR_PUBLISHING);
            LOG.error(String.format("Error encountered in processing the spark application %s with creation time %s for the service %s. Will continue with the rest of the applications.", safariAppState.getAppId(), Long.valueOf(safariAppState.getStateCreationTime()), this.context.getServiceName()), th);
        }
    }

    private void publishTarBallToWXM(SafariAnalysisPollerState.SafariAppState safariAppState, File file) {
        LOG.info("Publishing the tarball for the application {} for the service {} to WXM.", safariAppState.getAppId(), this.context.getServiceName());
        if (isFileSizeWithInTheWXMRange(file)) {
            this.context.getEventLogExporter().send(file);
            LOG.info("Published the tarball for the application {} for the service {} to WXM.", safariAppState.getAppId(), this.context.getServiceName());
        } else {
            LOG.warn("Skipped publishing the tarball for the application {} for serice {} to WXM as the tar ball is too large to publish to WXM.", safariAppState.getAppId(), this.context.getServiceName());
        }
        safariAppState.setSafariState(SafariAnalysisPollerState.SafariState.SAFARI_ENABLED_ANALYSIS_COMPLETED_AND_PUBLISHED);
    }

    private boolean isFileSizeWithInTheWXMRange(File file) {
        return new File(file.toString()).length() >= MAX_SIZE_ALLOWED_FOR_WXM;
    }

    private File generateArchiveAndReturnArchiveFile(SafariAnalysisPollerState.SafariAppState safariAppState, File file) {
        try {
            File compressAndDeleteDir = ExtractorUtil.compressAndDeleteDir(file);
            if (LOG.isDebugEnabled()) {
                LOG.debug("generated the manifest file for the app {} with size {}.", compressAndDeleteDir.getName(), Long.valueOf(compressAndDeleteDir.length()));
            }
            return compressAndDeleteDir;
        } catch (IOException e) {
            LOG.error("Failed in generating the manifest file for the application {} in the service {}." + safariAppState.getAppId(), this.context.getServiceName(), e);
            throw new RuntimeException(e);
        }
    }

    private void generateManifestFile(SafariAnalysisPollerState.SafariAppState safariAppState, File file) throws IOException {
        FileOutputStream fileOutputStream = new FileOutputStream(new File(file, MANIFEST_FILE_NAME));
        StringBuilder sb = new StringBuilder();
        sb.append(SPARK_SERVICE_NAME).append(" = ").append(this.context.getService().getCdxId()).append(System.lineSeparator());
        sb.append(SPARK_APPLICATION_ID).append(" = ").append(safariAppState.getAppId()).append(System.lineSeparator());
        fileOutputStream.write(sb.toString().getBytes());
        fileOutputStream.close();
        LOG.info("generated the manifest file for the app {} in the service {}.", safariAppState.getAppId(), this.context.getServiceName());
    }

    @VisibleForTesting
    List<SafariAnalysisPollerState.SafariAppState> getSafariAnalysisCompletedApplications(SafariAnalysisPollerState safariAnalysisPollerState) {
        List<SafariAnalysisPollerState.SafariAppState> list = (List) safariAnalysisPollerState.getLastExportedApps().stream().filter(safariAppState -> {
            return neeedsVerificationForSafariEnabledOrNot(safariAppState);
        }).map(safariAppState2 -> {
            return resolveSafariStatus(safariAppState2);
        }).filter(safariAppState3 -> {
            return isSafariAnalyisCompleted(safariAppState3);
        }).collect(Collectors.toList());
        LOG.info("There are a total of {} spark applications that need safari status verifications", Integer.valueOf(list.size()));
        saveState(safariAnalysisPollerState);
        return list;
    }

    public boolean isSafariAnalyisCompleted(SafariAnalysisPollerState.SafariAppState safariAppState) {
        return safariAppState.getSafariState() == SafariAnalysisPollerState.SafariState.SAFARI_ENABLED_ANALYSIS_COMPLETED || safariAppState.getSafariState() == SafariAnalysisPollerState.SafariState.SAFARI_ENABLED_ANALYSIS_COMPLETED_ERROR_PUBLISHING;
    }

    private SafariAnalysisPollerState.SafariAppState resolveSafariStatus(SafariAnalysisPollerState.SafariAppState safariAppState) {
        switch (safariAppState.getSafariState()) {
            case NEW_APP:
                if (!this.safariVerificationHelper.isSafariEnabledApplication(safariAppState.getAppId())) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Safari is not enabled for the application {}.", safariAppState.getAppId());
                    }
                    safariAppState.setSafariState(SafariAnalysisPollerState.SafariState.SAFARI_NOT_ENABLED);
                    break;
                } else if (!this.safariVerificationHelper.isSafariAnalyisCompleted(safariAppState.getAppId())) {
                    safariAppState.setSafariState(SafariAnalysisPollerState.SafariState.SAFARI_ENABLED_ANALYSIS_NOT_COMPLETED);
                    break;
                } else {
                    safariAppState.setSafariState(SafariAnalysisPollerState.SafariState.SAFARI_ENABLED_ANALYSIS_COMPLETED);
                    break;
                }
            case SAFARI_ENABLED_ANALYSIS_NOT_COMPLETED:
                if (this.safariVerificationHelper.isSafariAnalyisCompleted(safariAppState.getAppId())) {
                    safariAppState.setSafariState(SafariAnalysisPollerState.SafariState.SAFARI_ENABLED_ANALYSIS_COMPLETED);
                    break;
                }
                break;
            case SAFARI_NOT_ENABLED:
            case SAFARI_ENABLED_ANALYSIS_COMPLETED:
            case SAFARI_ENABLED_ANALYSIS_COMPLETED_AND_PUBLISHED:
                break;
            default:
                LOG.warn("Received unknown safari state {} for the application {}.", safariAppState.getSafariState().name(), safariAppState.getAppId());
                break;
        }
        return safariAppState;
    }

    private boolean neeedsVerificationForSafariEnabledOrNot(SafariAnalysisPollerState.SafariAppState safariAppState) {
        boolean z = safariAppState.getSafariState() == SafariAnalysisPollerState.SafariState.NEW_APP || safariAppState.getSafariState() == SafariAnalysisPollerState.SafariState.SAFARI_ENABLED_ANALYSIS_NOT_COMPLETED;
        if (LOG.isDebugEnabled()) {
            LOG.debug("The application {} needs safari verfication: {}", safariAppState.getAppId(), Boolean.valueOf(z));
        }
        return z;
    }

    @VisibleForTesting
    long processNewSparkApplications(SafariAnalysisPollerState safariAnalysisPollerState, List<SparkApplication> list) {
        long j = 0;
        if (list.size() == 0) {
            LOG.info("There are no new spark applications received from Spark History service.");
        } else {
            j = list.stream().filter(sparkApplication -> {
                return safariAnalysisPollerState.isStateNotPresent(sparkApplication.id);
            }).map(sparkApplication2 -> {
                return createNewApplicationState(sparkApplication2);
            }).map(safariAppState -> {
                safariAnalysisPollerState.addAppState(safariAppState);
                return safariAppState;
            }).count();
        }
        saveState(safariAnalysisPollerState);
        return j;
    }

    @VisibleForTesting
    List<SparkApplication> fetchNewSparkApplications(SafariAnalysisPollerState safariAnalysisPollerState, Instant instant, SparkHistoryClient sparkHistoryClient) {
        List<SparkApplication> newArrayList;
        Preconditions.checkNotNull(sparkHistoryClient);
        Instant startTimeForPoller = getStartTimeForPoller(safariAnalysisPollerState, instant);
        if (VersionString.of(this.context.getCdhVersion()).compareTo(MIN_CDH_VERSION_SUPPORTED) >= 0) {
            newArrayList = sparkHistoryClient.getApplicationsByEndTime(COMPLETED, DATE_FORMATTER.format(startTimeForPoller.toDate()));
        } else if (this.context.isSpark1()) {
            newArrayList = sparkHistoryClient.getApplications(COMPLETED, DATE_FORMATTER.format(startTimeForPoller.toDate()));
        } else {
            String str = sparkHistoryClient.getSparkVersion().spark;
            if (!str.startsWith("2.0.") && !str.startsWith("2.1.")) {
                newArrayList = sparkHistoryClient.getApplicationsByEndTime(COMPLETED, DATE_FORMATTER.format(startTimeForPoller.toDate()));
            } else if (this.context.getOptions().isSparkBefore2_2Supported()) {
                newArrayList = sparkHistoryClient.getApplications(COMPLETED, DATE_FORMATTER.format(startTimeForPoller.toDate()));
            } else {
                newArrayList = Lists.newArrayList();
                THROTTLED_LOG.info("Ignoring Spark applications because version {} is below 2.2", str);
            }
        }
        return newArrayList;
    }

    private SafariAnalysisPollerState.SafariAppState createNewApplicationState(SparkApplication sparkApplication) {
        SafariAnalysisPollerState.SafariAppState safariAppState = new SafariAnalysisPollerState.SafariAppState();
        safariAppState.setAppId(sparkApplication.id);
        safariAppState.setStateCreationTime(Instant.now().getMillis());
        safariAppState.setSafariState(SafariAnalysisPollerState.SafariState.NEW_APP);
        safariAppState.setStateCreationTime(System.currentTimeMillis());
        LOG.info("Noticed a new application with id {}.", sparkApplication.id);
        return safariAppState;
    }

    @VisibleForTesting
    SafariAnalysisPollerState loadState() {
        SafariAnalysisPollerState safariAnalysisPollerState = (SafariAnalysisPollerState) this.context.getStateStore().load(SafariAnalysisPollerState.class, new String[]{this.context.getService().getCdxId() + SAFARI_SUFFIX});
        if (safariAnalysisPollerState == null) {
            LOG.info("There is no state for safari analysis. Creating new empty state object");
            safariAnalysisPollerState = new SafariAnalysisPollerState();
        } else {
            LOG.debug("Using the state from state store.");
        }
        safariAnalysisPollerState.prepare();
        return safariAnalysisPollerState;
    }

    @VisibleForTesting
    void saveState(SafariAnalysisPollerState safariAnalysisPollerState) {
        this.context.getStateStore();
        this.context.getStateStore().save(safariAnalysisPollerState, new String[]{this.context.getService().getCdxId() + SAFARI_SUFFIX});
    }

    @VisibleForTesting
    private SparkHistoryClient createShsClient() {
        return (SparkHistoryClient) ExtractorUtil.createClient(SparkHistoryClient.class, this.context.getShsUrl(), true, this.context.isSecure(), this.context.isSslEnabled(), this.context.getTrustManagerProvider());
    }

    @VisibleForTesting
    void closeClient(SparkHistoryClient sparkHistoryClient) {
        HTTPConduit httpConduit = WebClient.getConfig(sparkHistoryClient).getHttpConduit();
        if (httpConduit == null) {
            throw new IllegalArgumentException("Client is not using the HTTP transport");
        }
        httpConduit.close();
    }

    static {
        DATE_FORMATTER.setCalendar(Calendar.getInstance(new SimpleTimeZone(0, "GMT")));
    }
}
