001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.util.Date;
023    import java.util.List;
024    
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.fs.Path;
027    import org.apache.oozie.CoordinatorActionBean;
028    import org.apache.oozie.ErrorCode;
029    import org.apache.oozie.client.CoordinatorAction;
030    import org.apache.oozie.client.OozieClient;
031    import org.apache.oozie.command.CommandException;
032    import org.apache.oozie.coord.CoordELEvaluator;
033    import org.apache.oozie.coord.CoordELFunctions;
034    import org.apache.oozie.service.HadoopAccessorException;
035    import org.apache.oozie.service.HadoopAccessorService;
036    import org.apache.oozie.service.Services;
037    import org.apache.oozie.store.CoordinatorStore;
038    import org.apache.oozie.store.StoreException;
039    import org.apache.oozie.util.DateUtils;
040    import org.apache.oozie.util.ELEvaluator;
041    import org.apache.oozie.util.Instrumentation;
042    import org.apache.oozie.util.ParamChecker;
043    import org.apache.oozie.util.XConfiguration;
044    import org.apache.oozie.util.XLog;
045    import org.apache.oozie.util.XmlUtils;
046    import org.jdom.Element;
047    
048    public class CoordActionInputCheckCommand extends CoordinatorCommand<Void> {
049    
050        private String actionId;
051        private final XLog log = XLog.getLog(getClass());
052        private int COMMAND_REQUEUE_INTERVAL = 60000; // 1 minute
053        private CoordinatorActionBean coordAction = null;
054    
055        public CoordActionInputCheckCommand(String actionId) {
056            super("coord_action_input", "coord_action_input", 1, XLog.STD);
057            this.actionId = actionId;
058        }
059    
060        @Override
061        protected Void call(CoordinatorStore store) throws StoreException, CommandException {
062            log.debug("After store.get() for action ID " + actionId + " : " + coordAction.getStatus());
063            // this action should only get processed if current time >
064            // materialization time
065            // otherwise, requeue this action after 30 seconds
066            Date nominalTime = coordAction.getNominalTime();
067            Date currentTime = new Date();
068            if (nominalTime.compareTo(currentTime) > 0) {
069                log.info("[" + actionId
070                        + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current="
071                        + currentTime + ", nominal=" + nominalTime);
072                queueCallable(new CoordActionInputCheckCommand(coordAction.getId()), Math.max(
073                        (nominalTime.getTime() - currentTime.getTime()), COMMAND_REQUEUE_INTERVAL));
074                // update lastModifiedTime
075                store.updateCoordinatorAction(coordAction);
076                return null;
077            }
078            if (coordAction.getStatus() == CoordinatorActionBean.Status.WAITING) {
079                log.info("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state.");
080                StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());// job.getXml();
081                Instrumentation.Cron cron = new Instrumentation.Cron();
082                try {
083                    Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
084                    cron.start();
085                    StringBuilder existList = new StringBuilder();
086                    StringBuilder nonExistList = new StringBuilder();
087                    StringBuilder nonResolvedList = new StringBuilder();
088                    CoordCommandUtils.getResolvedList(coordAction.getMissingDependencies(), nonExistList, nonResolvedList);
089    
090                    String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR);
091                    if (uriList.length > 0) {
092                        log.info("[" + actionId + "]::ActionInputCheck:: Missing deps:" + uriList[0] + ",  NonResolvedList:"
093                                + nonResolvedList.toString());
094                    } else {
095                        log.info("[" + actionId + "]::ActionInputCheck:: No missing deps,  NonResolvedList:"
096                                + nonResolvedList.toString());
097                    }
098                    boolean status = checkInput(actionXml, existList, nonExistList, actionConf);
099                    coordAction.setLastModifiedTime(currentTime);
100                    coordAction.setActionXml(actionXml.toString());
101                    if (nonResolvedList.length() > 0 && status == false) {
102                        nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
103                    }
104                    coordAction.setMissingDependencies(nonExistList.toString());
105                    if (status == true) {
106                        coordAction.setStatus(CoordinatorAction.Status.READY);
107                        // pass jobID to the ReadyCommand
108                        queueCallable(new CoordActionReadyCommand(coordAction.getJobId()), 100);
109                    }
110                    else {
111                        long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(),
112                                coordAction.getCreatedTime().getTime())) / (60 * 1000);
113                        int timeOut = coordAction.getTimeOut();
114                        if ((timeOut >= 0) && (waitingTime > timeOut)) {
115                            queueCallable(new CoordActionTimeOut(coordAction), 100);
116                            coordAction.setStatus(CoordinatorAction.Status.TIMEDOUT);
117                        }
118                        else {
119                            queueCallable(new CoordActionInputCheckCommand(coordAction.getId()), COMMAND_REQUEUE_INTERVAL);
120                        }
121                    }
122                    store.updateCoordActionMin(coordAction);
123                }
124                catch (Exception e) {
125                    log.warn(actionId + ": Exception occurs: " + e + " STORE is active " + store.isActive(), e);
126                    throw new CommandException(ErrorCode.E1005, e.getMessage(), e);
127                }
128                cron.stop();
129            }
130            else {
131                log.info("[" + actionId + "]::ActionInputCheck:: Ignoring action. Should be in WAITING state, but state="
132                        + coordAction.getStatus());
133            }
134            return null;
135        }
136    
137        protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList,
138                Configuration conf) throws Exception {
139            Element eAction = XmlUtils.parseXml(actionXml.toString());
140            boolean allExist = checkResolvedUris(eAction, existList, nonExistList, conf);
141            if (allExist) {
142                log.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future");
143                allExist = checkUnresolvedInstances(eAction, conf);
144            }
145            if (allExist == true) {
146                materializeDataProperties(eAction, conf);
147                actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString());
148            }
149            return allExist;
150        }
151    
152        /**
153         * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list
154         * of files that will be needed.
155         *
156         * @param eAction
157         * @param conf
158         * @throws Exception
159         * @update modify 'Action' element with appropriate list of files.
160         */
161        private void materializeDataProperties(Element eAction, Configuration conf) throws Exception {
162            ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, conf, actionId);
163            Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow",
164                    eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
165            if (configElem != null) {
166                for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
167                    resolveTagContents("value", propElem, eval);
168                }
169            }
170        }
171    
172        private void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception {
173            if (elem == null) {
174                return;
175            }
176            Element tagElem = elem.getChild(tagName, elem.getNamespace());
177            if (tagElem != null) {
178                String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText());
179                tagElem.removeContent();
180                tagElem.addContent(updated);
181            }
182            else {
183                log.warn(" Value NOT FOUND " + tagName);
184            }
185        }
186    
187        private boolean checkUnresolvedInstances(Element eAction, Configuration actionConf)
188                throws Exception {
189            String strAction = XmlUtils.prettyPrint(eAction).toString();
190            Date nominalTime = DateUtils.parseDateUTC(eAction.getAttributeValue("action-nominal-time"));
191            String actualTimeStr = eAction.getAttributeValue("action-actual-time");
192            Date actualTime = null;
193            if (actualTimeStr == null) {
194                log.debug("Unable to get action-actual-time from action xml, this job is submitted " +
195                "from previous version. Assign current date to actual time, action = " + actionId);
196                actualTime = new Date();
197            } else {
198                actualTime = DateUtils.parseDateUTC(actualTimeStr);
199            }
200    
201            StringBuffer resultedXml = new StringBuffer();
202    
203            boolean ret;
204            Element inputList = eAction.getChild("input-events", eAction.getNamespace());
205            if (inputList != null) {
206                ret = materializeUnresolvedEvent(inputList.getChildren("data-in", eAction.getNamespace()), nominalTime,
207                        actualTime, actionConf);
208                if (ret == false) {
209                    resultedXml.append(strAction);
210                    return false;
211                }
212            }
213    
214            // Using latest() or future() in output-event is not intuitive.
215            // We need to make
216            // sure, this assumption is correct.
217            Element outputList = eAction.getChild("output-events", eAction.getNamespace());
218            if (outputList != null) {
219                for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
220                    if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) != null) {
221                        throw new CommandException(ErrorCode.E1006, "coord:latest()/future()",
222                                " not permitted in output-event ");
223                    }
224                }
225                /*
226                 * ret = materializeUnresolvedEvent( (List<Element>)
227                 * outputList.getChildren("data-out", eAction.getNamespace()),
228                 * actualTime, nominalTime, actionConf); if (ret == false) {
229                 * resultedXml.append(strAction); return false; }
230                 */
231            }
232            return true;
233        }
234    
235        private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime,
236                Configuration conf) throws Exception {
237            for (Element dEvent : eDataEvents) {
238                if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) == null) {
239                    continue;
240                }
241                ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf);
242                String uresolvedInstance = dEvent.getChild("unresolved-instances", dEvent.getNamespace()).getTextTrim();
243                String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR);
244                StringBuffer resolvedTmp = new StringBuffer();
245                for (int i = 0; i < unresolvedList.length; i++) {
246                    String ret = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]);
247                    Boolean isResolved = (Boolean) eval.getVariable("is_resolved");
248                    if (isResolved == false) {
249                        log.info("[" + actionId + "]::Cannot resolve: " + ret);
250                        return false;
251                    }
252                    if (resolvedTmp.length() > 0) {
253                        resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR);
254                    }
255                    resolvedTmp.append((String) eval.getVariable("resolved_path"));
256                }
257                if (resolvedTmp.length() > 0) {
258                    if (dEvent.getChild("uris", dEvent.getNamespace()) != null) {
259                        resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append(
260                                dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim());
261                        dEvent.removeChild("uris", dEvent.getNamespace());
262                    }
263                    Element uriInstance = new Element("uris", dEvent.getNamespace());
264                    uriInstance.addContent(resolvedTmp.toString());
265                    dEvent.getContent().add(1, uriInstance);
266                }
267                dEvent.removeChild("unresolved-instances", dEvent.getNamespace());
268            }
269    
270            return true;
271        }
272    
273        private boolean checkResolvedUris(Element eAction, StringBuilder existList, StringBuilder nonExistList,
274                Configuration conf) throws IOException {
275    
276            log.info("[" + actionId + "]::ActionInputCheck:: In checkResolvedUris...");
277            Element inputList = eAction.getChild("input-events", eAction.getNamespace());
278            if (inputList != null) {
279                // List<Element> eDataEvents = inputList.getChildren("data-in",
280                // eAction.getNamespace());
281                // for (Element event : eDataEvents) {
282                // Element uris = event.getChild("uris", event.getNamespace());
283                if (nonExistList.length() > 0) {
284                    checkListOfPaths(existList, nonExistList, conf);
285                }
286                // }
287                return nonExistList.length() == 0;
288            }
289            return true;
290        }
291    
292        private boolean checkListOfPaths(StringBuilder existList, StringBuilder nonExistList, Configuration conf)
293                throws IOException {
294    
295            String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR);
296            if (uriList[0] != null) {
297                log.info("[" + actionId + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] + " is Missing.");
298            }
299    
300            nonExistList.delete(0, nonExistList.length());
301            boolean allExists = true;
302            String existSeparator = "", nonExistSeparator = "";
303            for (int i = 0; i < uriList.length; i++) {
304                if (allExists) {
305                    allExists = pathExists(uriList[i], conf);
306                    log.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists);
307                }
308                if (allExists) {
309                    existList.append(existSeparator).append(uriList[i]);
310                    existSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
311                }
312                else {
313                    nonExistList.append(nonExistSeparator).append(uriList[i]);
314                    nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
315                }
316            }
317            return allExists;
318        }
319    
320        private boolean pathExists(String sPath, Configuration actionConf) throws IOException {
321            log.debug("checking for the file " + sPath);
322            Path path = new Path(sPath);
323            String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
324            String group = ParamChecker.notEmpty(actionConf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME);
325            try {
326                return Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, path.toUri(),
327                        actionConf).exists(path);
328            }
329            catch (HadoopAccessorException e) {
330                throw new IOException(e);
331            }
332        }
333    
334        /**
335         * The function create a list of URIs separated by "," using the instances time stamp and URI-template
336         *
337         * @param event : <data-in> event
338         * @param instances : List of time stamp seprated by ","
339         * @param unresolvedInstances : list of instance with latest/future function
340         * @return : list of URIs separated by ",".
341         * @throws Exception
342         */
343        private String createURIs(Element event, String instances, StringBuilder unresolvedInstances) throws Exception {
344            if (instances == null || instances.length() == 0) {
345                return "";
346            }
347            String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR);
348            StringBuilder uris = new StringBuilder();
349    
350            for (int i = 0; i < instanceList.length; i++) {
351                int funcType = CoordCommandUtils.getFuncType(instanceList[i]);
352                if (funcType == CoordCommandUtils.LATEST || funcType == CoordCommandUtils.FUTURE) {
353                    if (unresolvedInstances.length() > 0) {
354                        unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR);
355                    }
356                    unresolvedInstances.append(instanceList[i]);
357                    continue;
358                }
359                ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]);
360                // uris.append(eval.evaluate(event.getChild("dataset",
361                // event.getNamespace()).getChild("uri-template",
362                // event.getNamespace()).getTextTrim(), String.class));
363                if (uris.length() > 0) {
364                    uris.append(CoordELFunctions.INSTANCE_SEPARATOR);
365                }
366                uris.append(CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()).getChild(
367                        "uri-template", event.getNamespace()).getTextTrim()));
368            }
369            return uris.toString();
370        }
371    
372        @Override
373        protected Void execute(CoordinatorStore store) throws StoreException, CommandException {
374            log.info("STARTED CoordActionInputCheckCommand for actionid=" + actionId);
375            try {
376                coordAction = store.getEntityManager().find(CoordinatorActionBean.class, actionId);
377                setLogInfo(coordAction);
378                if (lock(coordAction.getJobId())) {
379                    call(store);
380                }
381                else {
382                    queueCallable(new CoordActionInputCheckCommand(actionId), LOCK_FAILURE_REQUEUE_INTERVAL);
383                    log.warn("CoordActionInputCheckCommand lock was not acquired - failed jobId=" + coordAction.getJobId()
384                            + ", actionId=" + actionId + ". Requeing the same.");
385                }
386            }
387            catch (InterruptedException e) {
388                queueCallable(new CoordActionInputCheckCommand(actionId), LOCK_FAILURE_REQUEUE_INTERVAL);
389                log.warn("CoordActionInputCheckCommand lock acquiring failed with exception " + e.getMessage()
390                        + " for jobId=" + coordAction.getJobId() + ", actionId=" + actionId + " Requeing the same.");
391            }
392            finally {
393                log.info("ENDED CoordActionInputCheckCommand for actionid=" + actionId);
394            }
395            return null;
396        }
397    
398        /* (non-Javadoc)
399         * @see org.apache.oozie.command.Command#getKey()
400         */
401        @Override
402        public String getKey(){
403            return getName() + "_" + actionId;
404        }
405    
406    }