This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.util;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.oozie.CoordinatorJobBean;
022import org.apache.oozie.client.Job;
023import org.apache.oozie.service.SchemaService;
024import org.apache.oozie.service.Services;
025import org.apache.oozie.service.StatusTransitService;
026
027public class StatusUtils {
028
029    /**
030     * This Function transforms the statuses based on the name space of the coordinator App
031     *
032     * @param coordJob This will be the coordinator job bean for which we need to get the status based on version
033     * @return Job.Status This would be the new status based on the app version.
034     */
035    public static Job.Status getStatus(CoordinatorJobBean coordJob) {
036        Job.Status newStatus = null;
037        if (coordJob != null) {
038            newStatus = coordJob.getStatus();
039            Configuration conf = Services.get().getConf();
040            boolean backwardSupportForCoordStatus = conf.getBoolean(
041                    StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
042            if (backwardSupportForCoordStatus) {
043                if (coordJob.getAppNamespace() != null
044                        && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
045
046                    if (coordJob.getStatus() == Job.Status.DONEWITHERROR) {
047                        newStatus = Job.Status.SUCCEEDED;
048                    }
049                    else if (coordJob.getStatus() == Job.Status.PAUSED
050                            || coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
051                        newStatus = Job.Status.RUNNING;
052                    }
053                    else if ((coordJob.getStatus() == Job.Status.RUNNING || coordJob.getStatus() == Job.Status.RUNNINGWITHERROR)
054                            && coordJob.isDoneMaterialization()) {
055                        newStatus = Job.Status.SUCCEEDED;
056                    }
057                    else if (coordJob.getStatus() == Job.Status.PREPSUSPENDED) {
058                        newStatus = Job.Status.SUSPENDED;
059                    }
060                    else if (coordJob.getStatus() == Job.Status.PREPPAUSED) {
061                        newStatus = Job.Status.PREP;
062                    }
063                }
064            }
065        }
066        return newStatus;
067    }
068
069    /**
070     * This function changes back the status for coordinator rerun if the job was SUCCEEDED or SUSPENDED when rerun
071     * with backward support is true.
072     *
073     * @param coordJob This will be the coordinator job bean for which we need to get the status based on version
074     * @param prevStatus coordinator job previous status
075     * @return Job.Status This would be the new status based on the app version.
076     */
077    public static Job.Status getStatusForCoordRerun(CoordinatorJobBean coordJob, Job.Status prevStatus) {
078        Job.Status newStatus = null;
079        if (coordJob != null) {
080            newStatus = coordJob.getStatus();
081            Configuration conf = Services.get().getConf();
082            boolean backwardSupportForCoordStatus = conf.getBoolean(
083                    StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
084            if (backwardSupportForCoordStatus) {
085                if (coordJob.getAppNamespace() != null
086                        && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
087
088                    if (prevStatus == Job.Status.SUSPENDED) {
089                        newStatus = Job.Status.SUSPENDED;
090                    }
091                    else if (coordJob.isDoneMaterialization() || prevStatus == Job.Status.SUCCEEDED) {
092                        newStatus = Job.Status.SUCCEEDED;
093                        coordJob.setDoneMaterialization();
094                    }
095                }
096            }
097        }
098        return newStatus;
099    }
100
101    /**
102     * This function check if eligible to do action input check  when running with backward support is true.
103     *
104     * @param coordJob This will be the coordinator job bean for which we need to get the status based on version
105     * @return true if eligible to do action input check
106     */
107    public static boolean getStatusForCoordActionInputCheck(CoordinatorJobBean coordJob) {
108        boolean ret = false;
109        if (coordJob != null) {
110            Configuration conf = Services.get().getConf();
111            boolean backwardSupportForCoordStatus = conf.getBoolean(
112                    StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
113            if (backwardSupportForCoordStatus) {
114                if (coordJob.getAppNamespace() != null
115                        && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
116
117                    if (coordJob.getStatus() == Job.Status.SUCCEEDED) {
118                        ret = true;
119                    }
120                    else if (coordJob.getStatus() == Job.Status.SUSPENDED) {
121                        ret = true;
122                    }
123                }
124            }
125        }
126        return ret;
127    }
128
129    /**
130     * If namespace 0.1 is used and backward support is true, SUCCEEDED coord job can be killed
131     *
132     * @param coordJob the coordinator job
133     * @return true if namespace 0.1 is used and backward support is true, SUCCEEDED coord job can be killed
134     */
135    public static boolean isV1CoordjobKillable(CoordinatorJobBean coordJob) {
136        boolean ret = false;
137        if (coordJob != null) {
138            Configuration conf = Services.get().getConf();
139            boolean backwardSupportForCoordStatus = conf.getBoolean(
140                    StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
141            if (backwardSupportForCoordStatus) {
142                if (coordJob.getAppNamespace() != null
143                        && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
144                    if (coordJob.getStatus() == Job.Status.SUCCEEDED) {
145                        ret = true;
146                    }
147                }
148            }
149        }
150        return ret;
151    }
152
153    /**
154     * Get the status of coordinator job for Oozie versions (3.2 and before) when RUNNINGWITHERROR,
155     * SUSPENDEDWITHERROR and PAUSEDWITHERROR are not supported
156     * @param coordJob
157     * @return
158     */
159    public static Job.Status getStatusIfBackwardSupportTrue(Job.Status currentJobStatus) {
160        Job.Status newStatus = currentJobStatus;
161        Configuration conf = Services.get().getConf();
162        boolean backwardSupportForStatesWithoutError = conf.getBoolean(
163                StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, true);
164        if (backwardSupportForStatesWithoutError) {
165            if (currentJobStatus == Job.Status.PAUSEDWITHERROR) {
166                newStatus = Job.Status.PAUSED;
167            }
168            else if (currentJobStatus == Job.Status.SUSPENDEDWITHERROR) {
169                newStatus = Job.Status.SUSPENDED;
170            }
171            else if (currentJobStatus == Job.Status.RUNNINGWITHERROR) {
172                newStatus = Job.Status.RUNNING;
173            }
174        }
175
176        return newStatus;
177    }
178
179}