001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.coord;
020
021import java.text.ParseException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Date;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Set;
028import java.util.Map;
029import java.util.HashMap;
030import java.util.concurrent.TimeUnit;
031
032import org.apache.commons.lang.StringUtils;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.oozie.CoordinatorActionBean;
035import org.apache.oozie.CoordinatorEngine;
036import org.apache.oozie.ErrorCode;
037import org.apache.oozie.XException;
038import org.apache.oozie.client.OozieClient;
039import org.apache.oozie.client.rest.RestConstants;
040import org.apache.oozie.command.CommandException;
041import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluator;
042import org.apache.oozie.coord.input.logic.InputLogicParser;
043import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
044import org.apache.oozie.executor.jpa.CoordJobGetActionForNominalTimeJPAExecutor;
045import org.apache.oozie.executor.jpa.JPAExecutorException;
046import org.apache.oozie.service.ConfigurationService;
047import org.apache.oozie.service.JPAService;
048import org.apache.oozie.service.Services;
049import org.apache.oozie.service.XLogService;
050import org.apache.oozie.sla.SLAOperations;
051import org.apache.oozie.util.CoordActionsInDateRange;
052import org.apache.oozie.util.DateUtils;
053import org.apache.oozie.util.Pair;
054import org.apache.oozie.util.ParamChecker;
055import org.apache.oozie.util.XLog;
056import org.apache.oozie.util.XmlUtils;
057import org.jdom.Element;
058import org.jdom.JDOMException;
059
060import com.google.common.annotations.VisibleForTesting;
061
062
063public class CoordUtils {
064    public static final String HADOOP_USER = "user.name";
065
066    public static String getDoneFlag(Element doneFlagElement) {
067        if (doneFlagElement != null) {
068            return doneFlagElement.getTextTrim();
069        }
070        else {
071            return CoordELConstants.DEFAULT_DONE_FLAG;
072        }
073    }
074
075    public static Configuration getHadoopConf(Configuration jobConf) {
076        Configuration conf = new Configuration();
077        ParamChecker.notNull(jobConf, "Configuration to be used for hadoop setup ");
078        String user = ParamChecker.notEmpty(jobConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
079        conf.set(HADOOP_USER, user);
080        return conf;
081    }
082
083    /**
084     * Get the list of actions for a given coordinator job
085     * @param rangeType the rerun type (date, action)
086     * @param jobId the coordinator job id
087     * @param scope the date scope or action id scope
088     * @return the list of Coordinator actions
089     * @throws CommandException
090     */
091    public static List<CoordinatorActionBean> getCoordActions(String rangeType, String jobId, String scope,
092            boolean active) throws CommandException {
093        List<CoordinatorActionBean> coordActions = null;
094        if (rangeType.equals(RestConstants.JOB_COORD_SCOPE_DATE)) {
095            coordActions = CoordUtils.getCoordActionsFromDates(jobId, scope, active);
096        }
097        else if (rangeType.equals(RestConstants.JOB_COORD_SCOPE_ACTION)) {
098            coordActions = CoordUtils.getCoordActionsFromIds(jobId, scope);
099        }
100        return coordActions;
101    }
102
103    /**
104     * Get the list of actions for given date ranges
105     *
106     * @param jobId coordinator job id
107     * @param scope a comma-separated list of date ranges. Each date range element is specified with two dates separated by '::'
108     * @return the list of Coordinator actions for the date range
109     * @throws CommandException thrown if failed to get coordinator actions by given date range
110     */
111    @VisibleForTesting
112    public static List<CoordinatorActionBean> getCoordActionsFromDates(String jobId, String scope, boolean active)
113            throws CommandException {
114        JPAService jpaService = Services.get().get(JPAService.class);
115        ParamChecker.notEmpty(jobId, "jobId");
116        ParamChecker.notEmpty(scope, "scope");
117
118        Set<CoordinatorActionBean> actionSet = new HashSet<CoordinatorActionBean>();
119        String[] list = scope.split(",");
120        for (String s : list) {
121            s = s.trim();
122            // A date range is specified with two dates separated by '::'
123            if (s.contains("::")) {
124            List<CoordinatorActionBean> listOfActions;
125            try {
126                // Get list of actions within the range of date
127                listOfActions = CoordActionsInDateRange.getCoordActionsFromDateRange(jobId, s, active);
128            }
129            catch (XException e) {
130                throw new CommandException(e);
131            }
132            actionSet.addAll(listOfActions);
133            }
134            else {
135                try {
136                    // Get action for the nominal time
137                    Date date = DateUtils.parseDateOozieTZ(s.trim());
138                    CoordinatorActionBean coordAction = jpaService
139                            .execute(new CoordJobGetActionForNominalTimeJPAExecutor(jobId, date));
140
141                    if (coordAction != null) {
142                        actionSet.add(coordAction);
143                    }
144                    else {
145                        throw new RuntimeException("This should never happen, Coordinator Action shouldn't be null");
146                    }
147                }
148                catch (ParseException e) {
149                    throw new CommandException(ErrorCode.E0302, s.trim(), e);
150                }
151                catch (JPAExecutorException e) {
152                    if (e.getErrorCode() == ErrorCode.E0605) {
153                        XLog.getLog(CoordUtils.class).info("No action for nominal time:" + s + ". Skipping over");
154                    }
155                    throw new CommandException(e);
156                }
157
158            }
159        }
160
161        List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>();
162        for (CoordinatorActionBean coordAction : actionSet) {
163            coordActions.add(coordAction);
164        }
165        return coordActions;
166    }
167
168    public static Set<String> getActionsIds(String jobId, String scope) throws CommandException {
169        ParamChecker.notEmpty(jobId, "jobId");
170        ParamChecker.notEmpty(scope, "scope");
171
172        Set<String> actions = new HashSet<String>();
173        String[] list = scope.split(",");
174        for (String s : list) {
175            s = s.trim();
176            // An action range is specified with two actions separated by '-'
177            if (s.contains("-")) {
178                String[] range = s.split("-");
179                // Check the format for action's range
180                if (range.length != 2) {
181                    throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "', an example of correct format is 1-5");
182                }
183                int start;
184                int end;
185                //Get the starting and ending action numbers
186                try {
187                    start = Integer.parseInt(range[0].trim());
188                } catch (NumberFormatException ne) {
189                    throw new CommandException(ErrorCode.E0302, "could not parse " + range[0].trim() + "into an integer", ne);
190                }
191                try {
192                    end = Integer.parseInt(range[1].trim());
193                } catch (NumberFormatException ne) {
194                    throw new CommandException(ErrorCode.E0302, "could not parse " + range[1].trim() + "into an integer", ne);
195                }
196                if (start > end) {
197                    throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "', starting action"
198                            + "number of the range should be less than ending action number, an example will be 1-4");
199                }
200                // Add the actionIds
201                for (int i = start; i <= end; i++) {
202                    actions.add(jobId + "@" + i);
203                }
204            }
205            else {
206                try {
207                    Integer.parseInt(s);
208                }
209                catch (NumberFormatException ne) {
210                    throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s
211                            + "'. Integer only.");
212                }
213                actions.add(jobId + "@" + s);
214            }
215        }
216        return actions;
217    }
218
219    /**
220     * Get the list of actions for given id ranges
221     *
222     * @param jobId coordinator job id
223     * @param scope a comma-separated list of action ranges. The action range is specified with two action numbers separated by '-'
224     * @return the list of all Coordinator actions for action range
225     * @throws CommandException thrown if failed to get coordinator actions by given id range
226     */
227     @VisibleForTesting
228     public static List<CoordinatorActionBean> getCoordActionsFromIds(String jobId, String scope) throws CommandException {
229        JPAService jpaService = Services.get().get(JPAService.class);
230        Set<String> actions = getActionsIds(jobId, scope);
231        // Retrieve the actions using the corresponding actionIds
232        List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>();
233        for (String id : actions) {
234            CoordinatorActionBean coordAction = null;
235            try {
236                coordAction = jpaService.execute(new CoordActionGetJPAExecutor(id));
237            }
238            catch (JPAExecutorException je) {
239                if (je.getErrorCode().equals(ErrorCode.E0605)) { //ignore retrieval of non-existent actions in range
240                    XLog.getLog(XLogService.class).warn(
241                            "Coord action ID num [{0}] not yet materialized. Hence skipping over it for Kill action",
242                            id.substring(id.indexOf("@") + 1));
243                    continue;
244                }
245                else {
246                    throw new CommandException(je);
247                }
248            }
249            coordActions.add(coordAction);
250        }
251        return coordActions;
252    }
253
254     /**
255      * Check if sla alert is disabled for action.
256      * @param actionBean
257      * @param coordName
258      * @param jobConf
259      * @return
260      * @throws ParseException
261      */
262    public static boolean isSlaAlertDisabled(CoordinatorActionBean actionBean, String coordName, Configuration jobConf)
263            throws ParseException {
264
265        int disableSlaNotificationOlderThan = jobConf.getInt(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN,
266                ConfigurationService.getInt(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN));
267
268        if (disableSlaNotificationOlderThan > 0) {
269            // Disable alert for catchup jobs
270            long timeDiffinHrs = TimeUnit.MILLISECONDS.toHours(new Date().getTime()
271                    - actionBean.getNominalTime().getTime());
272            if (timeDiffinHrs > jobConf.getLong(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN,
273                    ConfigurationService.getLong(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN))) {
274                return true;
275            }
276        }
277
278        boolean disableAlert = false;
279        if (jobConf.get(OozieClient.SLA_DISABLE_ALERT_COORD) != null) {
280            String coords = jobConf.get(OozieClient.SLA_DISABLE_ALERT_COORD);
281            Set<String> coordsToDisableFor = new HashSet<String>(Arrays.asList(coords.split(",")));
282            if (coordsToDisableFor.contains(coordName)) {
283                return true;
284            }
285            if (coordsToDisableFor.contains(actionBean.getJobId())) {
286                return true;
287            }
288        }
289
290        // Check if sla alert is disabled for that action
291        if (!StringUtils.isEmpty(jobConf.get(OozieClient.SLA_DISABLE_ALERT))
292                && getCoordActionSLAAlertStatus(actionBean, coordName, jobConf, OozieClient.SLA_DISABLE_ALERT)) {
293            return true;
294        }
295
296        // Check if sla alert is enabled for that action
297        if (!StringUtils.isEmpty(jobConf.get(OozieClient.SLA_ENABLE_ALERT))
298                && getCoordActionSLAAlertStatus(actionBean, coordName, jobConf, OozieClient.SLA_ENABLE_ALERT)) {
299            return false;
300        }
301
302        return disableAlert;
303    }
304
305    /**
306     * Get coord action SLA alert status.
307     * @param actionBean
308     * @param coordName
309     * @param jobConf
310     * @param slaAlertType
311     * @return
312     * @throws ParseException
313     */
314    private static boolean getCoordActionSLAAlertStatus(CoordinatorActionBean actionBean, String coordName,
315            Configuration jobConf, String slaAlertType) throws ParseException {
316        String slaAlertList;
317
318       if (!StringUtils.isEmpty(jobConf.get(slaAlertType))) {
319            slaAlertList = jobConf.get(slaAlertType);
320            // check if ALL or date/action-num range
321            if (slaAlertList.equalsIgnoreCase(SLAOperations.ALL_VALUE)) {
322                return true;
323            }
324            String[] values = slaAlertList.split(",");
325            for (String value : values) {
326                value = value.trim();
327                if (value.contains("::")) {
328                    String[] datesInRange = value.split("::");
329                    Date start = DateUtils.parseDateOozieTZ(datesInRange[0].trim());
330                    Date end = DateUtils.parseDateOozieTZ(datesInRange[1].trim());
331                    // check if nominal time in this range
332                    if (actionBean.getNominalTime().compareTo(start) >= 0
333                            || actionBean.getNominalTime().compareTo(end) <= 0) {
334                        return true;
335                    }
336                }
337                else if (value.contains("-")) {
338                    String[] actionsInRange = value.split("-");
339                    int start = Integer.parseInt(actionsInRange[0].trim());
340                    int end = Integer.parseInt(actionsInRange[1].trim());
341                    // check if action number in this range
342                    if (actionBean.getActionNumber() >= start || actionBean.getActionNumber() <= end) {
343                        return true;
344                    }
345                }
346                else {
347                    int actionNumber = Integer.parseInt(value.trim());
348                    if (actionBean.getActionNumber() == actionNumber) {
349                        return true;
350                    }
351                }
352            }
353        }
354        return false;
355    }
356
357    // Form the where clause to filter by status values
358    public static Map<String, Object> getWhereClause(StringBuilder sb, Map<Pair<String, CoordinatorEngine.FILTER_COMPARATORS>,
359            List<Object>> filterMap) {
360        Map<String, Object> params = new HashMap<String, Object>();
361        int pcnt= 1;
362        for (Map.Entry<Pair<String, CoordinatorEngine.FILTER_COMPARATORS>, List<Object>> filter : filterMap.entrySet()) {
363            String field = filter.getKey().getFist();
364            CoordinatorEngine.FILTER_COMPARATORS comp = filter.getKey().getSecond();
365            String sqlField;
366            if (field.equals(OozieClient.FILTER_STATUS)) {
367                sqlField = "a.statusStr";
368            } else if (field.equals(OozieClient.FILTER_NOMINAL_TIME)) {
369                sqlField = "a.nominalTimestamp";
370            } else {
371                throw new IllegalArgumentException("Invalid filter key " + field);
372            }
373
374            sb.append(" and ").append(sqlField).append(" ");
375            switch (comp) {
376                case EQUALS:
377                    sb.append("IN (");
378                    params.putAll(appendParams(sb, filter.getValue(), pcnt));
379                    sb.append(")");
380                    break;
381
382                case NOT_EQUALS:
383                    sb.append("NOT IN (");
384                    params.putAll(appendParams(sb, filter.getValue(), pcnt));
385                    sb.append(")");
386                    break;
387
388                case GREATER:
389                case GREATER_EQUAL:
390                case LESSTHAN:
391                case LESSTHAN_EQUAL:
392                    if (filter.getValue().size() != 1) {
393                        throw new IllegalArgumentException(field + comp.getSign() + " can't have more than 1 values");
394                    }
395
396                    sb.append(comp.getSign()).append(" ");
397                    params.putAll(appendParams(sb, filter.getValue(), pcnt));
398                    break;
399            }
400
401            pcnt += filter.getValue().size();
402        }
403        sb.append(" ");
404        return params;
405    }
406
407    private static Map<String, Object> appendParams(StringBuilder sb, List<Object> value, int sindex) {
408        Map<String, Object> params = new HashMap<String, Object>();
409        boolean first = true;
410        for (Object val : value) {
411            String pname = "p" + sindex++;
412            params.put(pname, val);
413            if (!first) {
414                sb.append(", ");
415            }
416            sb.append(':').append(pname);
417            first = false;
418        }
419        return params;
420    }
421
422    public static boolean isInputLogicSpecified(String actionXml) throws JDOMException {
423        return isInputLogicSpecified(XmlUtils.parseXml(actionXml));
424    }
425
426    public static boolean isInputLogicSpecified(Element eAction) throws JDOMException {
427        return eAction.getChild(CoordInputLogicEvaluator.INPUT_LOGIC, eAction.getNamespace()) != null;
428    }
429
430    public static String getInputLogic(String actionXml) throws JDOMException {
431        return getInputLogic(XmlUtils.parseXml(actionXml));
432    }
433
434    public static String getInputLogic(Element actionXml) throws JDOMException {
435        return new InputLogicParser().parse(actionXml.getChild(CoordInputLogicEvaluator.INPUT_LOGIC,
436                actionXml.getNamespace()));
437    }
438
439}