001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.bundle;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.util.Date;
023    import java.util.HashMap;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Map.Entry;
027    
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.oozie.BundleActionBean;
030    import org.apache.oozie.BundleJobBean;
031    import org.apache.oozie.ErrorCode;
032    import org.apache.oozie.XException;
033    import org.apache.oozie.client.Job;
034    import org.apache.oozie.client.OozieClient;
035    import org.apache.oozie.client.rest.JsonBean;
036    import org.apache.oozie.command.CommandException;
037    import org.apache.oozie.command.PreconditionException;
038    import org.apache.oozie.command.StartTransitionXCommand;
039    import org.apache.oozie.command.coord.CoordSubmitXCommand;
040    import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
041    import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
042    import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
043    import org.apache.oozie.executor.jpa.JPAExecutorException;
044    import org.apache.oozie.service.JPAService;
045    import org.apache.oozie.service.Services;
046    import org.apache.oozie.util.JobUtils;
047    import org.apache.oozie.util.LogUtils;
048    import org.apache.oozie.util.ParamChecker;
049    import org.apache.oozie.util.XConfiguration;
050    import org.apache.oozie.util.XmlUtils;
051    import org.jdom.Attribute;
052    import org.jdom.Element;
053    import org.jdom.JDOMException;
054    
055    /**
056     * The command to start Bundle job
057     */
058    public class BundleStartXCommand extends StartTransitionXCommand {
059        private final String jobId;
060        private BundleJobBean bundleJob;
061        private JPAService jpaService = null;
062    
063        /**
064         * The constructor for class {@link BundleStartXCommand}
065         *
066         * @param jobId the bundle job id
067         */
068        public BundleStartXCommand(String jobId) {
069            super("bundle_start", "bundle_start", 1);
070            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
071        }
072    
073        /**
074         * The constructor for class {@link BundleStartXCommand}
075         *
076         * @param jobId the bundle job id
077         * @param dryrun true if dryrun is enable
078         */
079        public BundleStartXCommand(String jobId, boolean dryrun) {
080            super("bundle_start", "bundle_start", 1, dryrun);
081            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
082        }
083    
084        /* (non-Javadoc)
085         * @see org.apache.oozie.command.XCommand#getEntityKey()
086         */
087        @Override
088        public String getEntityKey() {
089            return jobId;
090        }
091    
092        @Override
093        public String getKey() {
094            return getName() + "_" + jobId;
095        }
096    
097        /* (non-Javadoc)
098         * @see org.apache.oozie.command.XCommand#isLockRequired()
099         */
100        @Override
101        protected boolean isLockRequired() {
102            return true;
103        }
104    
105        /* (non-Javadoc)
106         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
107         */
108        @Override
109        protected void verifyPrecondition() throws CommandException, PreconditionException {
110            eagerVerifyPrecondition();
111        }
112    
113        /* (non-Javadoc)
114         * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
115         */
116        @Override
117        protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
118            if (bundleJob.getStatus() != Job.Status.PREP) {
119                String msg = "Bundle " + bundleJob.getId() + " is not in PREP status. It is in : " + bundleJob.getStatus();
120                LOG.info(msg);
121                throw new PreconditionException(ErrorCode.E1100, msg);
122            }
123        }
124        /* (non-Javadoc)
125         * @see org.apache.oozie.command.XCommand#loadState()
126         */
127        @Override
128        public void loadState() throws CommandException {
129            eagerLoadState();
130        }
131    
132        /* (non-Javadoc)
133         * @see org.apache.oozie.command.XCommand#eagerLoadState()
134         */
135        @Override
136        public void eagerLoadState() throws CommandException {
137            try {
138                jpaService = Services.get().get(JPAService.class);
139    
140                if (jpaService != null) {
141                    this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
142                    LogUtils.setLogInfo(bundleJob, logInfo);
143                    super.setJob(bundleJob);
144    
145                }
146                else {
147                    throw new CommandException(ErrorCode.E0610);
148                }
149            }
150            catch (XException ex) {
151                throw new CommandException(ex);
152            }
153        }
154    
155        /* (non-Javadoc)
156         * @see org.apache.oozie.command.StartTransitionXCommand#StartChildren()
157         */
158        @Override
159        public void StartChildren() throws CommandException {
160            LOG.debug("Started coord jobs for the bundle=[{0}]", jobId);
161            insertBundleActions();
162            startCoordJobs();
163            LOG.debug("Ended coord jobs for the bundle=[{0}]", jobId);
164        }
165    
166        /* (non-Javadoc)
167         * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
168         */
169        @Override
170        public void notifyParent() {
171        }
172    
173        /* (non-Javadoc)
174         * @see org.apache.oozie.command.StartTransitionXCommand#performWrites()
175         */
176        @Override
177        public void performWrites() throws CommandException {
178            try {
179                jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
180            }
181            catch (JPAExecutorException e) {
182                throw new CommandException(e);
183            }
184        }
185    
186        /**
187         * Insert bundle actions
188         *
189         * @throws CommandException thrown if failed to create bundle actions
190         */
191        @SuppressWarnings("unchecked")
192        private void insertBundleActions() throws CommandException {
193            if (bundleJob != null) {
194                Map<String, Boolean> map = new HashMap<String, Boolean>();
195                try {
196                    Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
197                    List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
198                    for (Element elem : coordElems) {
199                        Attribute name = elem.getAttribute("name");
200                        Attribute critical = elem.getAttribute("critical");
201                        if (name != null) {
202                            if (map.containsKey(name.getValue())) {
203                                throw new CommandException(ErrorCode.E1304, name);
204                            }
205                            boolean isCritical = false;
206                            if (critical != null && Boolean.parseBoolean(critical.getValue())) {
207                                isCritical = true;
208                            }
209                            map.put(name.getValue(), isCritical);
210                        }
211                        else {
212                            throw new CommandException(ErrorCode.E1305);
213                        }
214                    }
215                }
216                catch (JDOMException jex) {
217                    throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex);
218                }
219    
220                // if there is no coordinator for this bundle, failed it.
221                if (map.isEmpty()) {
222                    bundleJob.setStatus(Job.Status.FAILED);
223                    bundleJob.resetPending();
224                    try {
225                        jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
226                    }
227                    catch (JPAExecutorException jex) {
228                        throw new CommandException(jex);
229                    }
230    
231                    LOG.debug("No coord jobs for the bundle=[{0}], failed it!!", jobId);
232                    throw new CommandException(ErrorCode.E1318, jobId);
233                }
234    
235                for (Entry<String, Boolean> coordName : map.entrySet()) {
236                    BundleActionBean action = createBundleAction(jobId, coordName.getKey(), coordName.getValue());
237                    insertList.add(action);
238                }
239            }
240            else {
241                throw new CommandException(ErrorCode.E0604, jobId);
242            }
243        }
244    
245        private BundleActionBean createBundleAction(String jobId, String coordName, boolean isCritical) {
246            BundleActionBean action = new BundleActionBean();
247            action.setBundleActionId(jobId + "_" + coordName);
248            action.setBundleId(jobId);
249            action.setCoordName(coordName);
250            action.setStatus(Job.Status.PREP);
251            action.setLastModifiedTime(new Date());
252            if (isCritical) {
253                action.setCritical();
254            }
255            else {
256                action.resetCritical();
257            }
258            return action;
259        }
260    
261        /**
262         * Start Coord Jobs
263         *
264         * @throws CommandException thrown if failed to start coord jobs
265         */
266        @SuppressWarnings("unchecked")
267        private void startCoordJobs() throws CommandException {
268            if (bundleJob != null) {
269                try {
270                    Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
271                    List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
272                    for (Element coordElem : coordElems) {
273                        Attribute name = coordElem.getAttribute("name");
274                        Configuration coordConf = mergeConfig(coordElem);
275                        coordConf.set(OozieClient.BUNDLE_ID, jobId);
276    
277                        queue(new CoordSubmitXCommand(coordConf, bundleJob.getId(), name.getValue()));
278    
279                    }
280                    updateBundleAction();
281                }
282                catch (JDOMException jex) {
283                    throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex);
284                }
285                catch (JPAExecutorException je) {
286                    throw new CommandException(je);
287                }
288            }
289            else {
290                throw new CommandException(ErrorCode.E0604, jobId);
291            }
292        }
293    
294        private void updateBundleAction() throws JPAExecutorException {
295            for(JsonBean bAction : insertList) {
296                BundleActionBean action = (BundleActionBean) bAction;
297                action.incrementAndGetPending();
298                action.setLastModifiedTime(new Date());
299            }
300        }
301    
302        /**
303         * Merge Bundle job config and the configuration from the coord job to pass
304         * to Coord Engine
305         *
306         * @param coordElem the coordinator configuration
307         * @return Configuration merged configuration
308         * @throws CommandException thrown if failed to merge configuration
309         */
310        private Configuration mergeConfig(Element coordElem) throws CommandException {
311            String jobConf = bundleJob.getConf();
312            // Step 1: runConf = jobConf
313            Configuration runConf = null;
314            try {
315                runConf = new XConfiguration(new StringReader(jobConf));
316            }
317            catch (IOException e1) {
318                LOG.warn("Configuration parse error in:" + jobConf);
319                throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1);
320            }
321            // Step 2: Merge local properties into runConf
322            // extract 'property' tags under 'configuration' block in the coordElem
323            // convert Element to XConfiguration
324            Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace());
325    
326            if (localConfigElement != null) {
327                String strConfig = XmlUtils.prettyPrint(localConfigElement).toString();
328                Configuration localConf;
329                try {
330                    localConf = new XConfiguration(new StringReader(strConfig));
331                }
332                catch (IOException e1) {
333                    LOG.warn("Configuration parse error in:" + strConfig);
334                    throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1);
335                }
336    
337                // copy configuration properties in the coordElem to the runConf
338                XConfiguration.copy(localConf, runConf);
339            }
340    
341            // Step 3: Extract value of 'app-path' in coordElem, save it as a
342            // new property called 'oozie.coord.application.path', and normalize.
343            String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue();
344            runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
345            // Normalize coordinator appPath here;
346            try {
347                JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf);
348            }
349            catch (IOException e) {
350                throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH));
351            }
352            return runConf;
353        }
354    
355        /* (non-Javadoc)
356         * @see org.apache.oozie.command.TransitionXCommand#getJob()
357         */
358        @Override
359        public Job getJob() {
360            return bundleJob;
361        }
362    
363        /* (non-Javadoc)
364         * @see org.apache.oozie.command.TransitionXCommand#updateJob()
365         */
366        @Override
367        public void updateJob() throws CommandException {
368            updateList.add(bundleJob);
369        }
370    }