001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.bundle;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.util.Date;
023    import java.util.HashMap;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Map.Entry;
027    
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.oozie.BundleActionBean;
030    import org.apache.oozie.BundleJobBean;
031    import org.apache.oozie.ErrorCode;
032    import org.apache.oozie.XException;
033    import org.apache.oozie.client.Job;
034    import org.apache.oozie.client.OozieClient;
035    import org.apache.oozie.command.CommandException;
036    import org.apache.oozie.command.PreconditionException;
037    import org.apache.oozie.command.StartTransitionXCommand;
038    import org.apache.oozie.command.coord.CoordSubmitXCommand;
039    import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor;
040    import org.apache.oozie.executor.jpa.BundleActionInsertJPAExecutor;
041    import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
042    import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
043    import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
044    import org.apache.oozie.executor.jpa.JPAExecutorException;
045    import org.apache.oozie.service.JPAService;
046    import org.apache.oozie.service.Services;
047    import org.apache.oozie.util.JobUtils;
048    import org.apache.oozie.util.LogUtils;
049    import org.apache.oozie.util.ParamChecker;
050    import org.apache.oozie.util.XConfiguration;
051    import org.apache.oozie.util.XmlUtils;
052    import org.jdom.Attribute;
053    import org.jdom.Element;
054    import org.jdom.JDOMException;
055    
056    /**
057     * The command to start Bundle job
058     */
059    public class BundleStartXCommand extends StartTransitionXCommand {
060        private final String jobId;
061        private BundleJobBean bundleJob;
062        private JPAService jpaService = null;
063    
064        /**
065         * The constructor for class {@link BundleStartXCommand}
066         *
067         * @param jobId the bundle job id
068         */
069        public BundleStartXCommand(String jobId) {
070            super("bundle_start", "bundle_start", 1);
071            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
072        }
073    
074        /**
075         * The constructor for class {@link BundleStartXCommand}
076         *
077         * @param jobId the bundle job id
078         * @param dryrun true if dryrun is enable
079         */
080        public BundleStartXCommand(String jobId, boolean dryrun) {
081            super("bundle_start", "bundle_start", 1, dryrun);
082            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
083        }
084    
085        /* (non-Javadoc)
086         * @see org.apache.oozie.command.XCommand#getEntityKey()
087         */
088        @Override
089        public String getEntityKey() {
090            return jobId;
091        }
092    
093        /* (non-Javadoc)
094         * @see org.apache.oozie.command.XCommand#isLockRequired()
095         */
096        @Override
097        protected boolean isLockRequired() {
098            return true;
099        }
100    
101        /* (non-Javadoc)
102         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
103         */
104        @Override
105        protected void verifyPrecondition() throws CommandException, PreconditionException {
106            eagerVerifyPrecondition();
107        }
108    
109        /* (non-Javadoc)
110         * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
111         */
112        @Override
113        protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
114            if (bundleJob.getStatus() != Job.Status.PREP) {
115                String msg = "Bundle " + bundleJob.getId() + " is not in PREP status. It is in : " + bundleJob.getStatus();
116                LOG.info(msg);
117                throw new PreconditionException(ErrorCode.E1100, msg);
118            }
119        }
120        /* (non-Javadoc)
121         * @see org.apache.oozie.command.XCommand#loadState()
122         */
123        @Override
124        public void loadState() throws CommandException {
125            eagerLoadState();
126        }
127    
128        /* (non-Javadoc)
129         * @see org.apache.oozie.command.XCommand#eagerLoadState()
130         */
131        @Override
132        public void eagerLoadState() throws CommandException {
133            try {
134                jpaService = Services.get().get(JPAService.class);
135    
136                if (jpaService != null) {
137                    this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
138                    LogUtils.setLogInfo(bundleJob, logInfo);
139                    super.setJob(bundleJob);
140    
141                }
142                else {
143                    throw new CommandException(ErrorCode.E0610);
144                }
145            }
146            catch (XException ex) {
147                throw new CommandException(ex);
148            }
149        }
150    
151        /* (non-Javadoc)
152         * @see org.apache.oozie.command.StartTransitionXCommand#StartChildren()
153         */
154        @Override
155        public void StartChildren() throws CommandException {
156            LOG.debug("Started coord jobs for the bundle=[{0}]", jobId);
157            insertBundleActions();
158            startCoordJobs();
159            LOG.debug("Ended coord jobs for the bundle=[{0}]", jobId);
160        }
161    
162        /* (non-Javadoc)
163         * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
164         */
165        @Override
166        public void notifyParent() {
167        }
168    
169        /**
170         * Insert bundle actions
171         *
172         * @throws CommandException thrown if failed to create bundle actions
173         */
174        @SuppressWarnings("unchecked")
175        private void insertBundleActions() throws CommandException {
176            if (bundleJob != null) {
177                Map<String, Boolean> map = new HashMap<String, Boolean>();
178                try {
179                    Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
180                    List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
181                    for (Element elem : coordElems) {
182                        Attribute name = elem.getAttribute("name");
183                        Attribute critical = elem.getAttribute("critical");
184                        if (name != null) {
185                            if (map.containsKey(name.getValue())) {
186                                throw new CommandException(ErrorCode.E1304, name);
187                            }
188                            boolean isCritical = false;
189                            if (critical != null && Boolean.parseBoolean(critical.getValue())) {
190                                isCritical = true;
191                            }
192                            map.put(name.getValue(), isCritical);
193                        }
194                        else {
195                            throw new CommandException(ErrorCode.E1305);
196                        }
197                    }
198                }
199                catch (JDOMException jex) {
200                    throw new CommandException(ErrorCode.E1301, jex);
201                }
202    
203                try {
204                    // if there is no coordinator for this bundle, failed it.
205                    if (map.isEmpty()) {
206                        bundleJob.setStatus(Job.Status.FAILED);
207                        bundleJob.resetPending();
208                        jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
209                        LOG.debug("No coord jobs for the bundle=[{0}], failed it!!", jobId);
210                        throw new CommandException(ErrorCode.E1318, jobId);
211                    }
212    
213                    for (Entry<String, Boolean> coordName : map.entrySet()) {
214                        BundleActionBean action = createBundleAction(jobId, coordName.getKey(), coordName.getValue());
215    
216                        jpaService.execute(new BundleActionInsertJPAExecutor(action));
217                    }
218                }
219                catch (JPAExecutorException je) {
220                    throw new CommandException(je);
221                }
222    
223            }
224            else {
225                throw new CommandException(ErrorCode.E0604, jobId);
226            }
227        }
228    
229        private BundleActionBean createBundleAction(String jobId, String coordName, boolean isCritical) {
230            BundleActionBean action = new BundleActionBean();
231            action.setBundleActionId(jobId + "_" + coordName);
232            action.setBundleId(jobId);
233            action.setCoordName(coordName);
234            action.setStatus(Job.Status.PREP);
235            action.setLastModifiedTime(new Date());
236            if (isCritical) {
237                action.setCritical();
238            }
239            else {
240                action.resetCritical();
241            }
242            return action;
243        }
244    
245        /**
246         * Start Coord Jobs
247         *
248         * @throws CommandException thrown if failed to start coord jobs
249         */
250        @SuppressWarnings("unchecked")
251        private void startCoordJobs() throws CommandException {
252            if (bundleJob != null) {
253                try {
254                    Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
255                    List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
256                    for (Element coordElem : coordElems) {
257                        Attribute name = coordElem.getAttribute("name");
258                        Configuration coordConf = mergeConfig(coordElem);
259                        coordConf.set(OozieClient.BUNDLE_ID, jobId);
260    
261                        queue(new CoordSubmitXCommand(coordConf, bundleJob.getAuthToken(), bundleJob.getId(), name.getValue()));
262    
263                        updateBundleAction(name.getValue());
264                    }
265                }
266                catch (JDOMException jex) {
267                    throw new CommandException(ErrorCode.E1301, jex);
268                }
269                catch (JPAExecutorException je) {
270                    throw new CommandException(je);
271                }
272            }
273            else {
274                throw new CommandException(ErrorCode.E0604, jobId);
275            }
276        }
277    
278        private void updateBundleAction(String coordName) throws JPAExecutorException {
279            BundleActionBean action = jpaService.execute(new BundleActionGetJPAExecutor(jobId, coordName));
280            action.incrementAndGetPending();
281            action.setLastModifiedTime(new Date());
282            jpaService.execute(new BundleActionUpdateJPAExecutor(action));
283        }
284    
285        /**
286         * Merge Bundle job config and the configuration from the coord job to pass
287         * to Coord Engine
288         *
289         * @param coordElem the coordinator configuration
290         * @return Configuration merged configuration
291         * @throws CommandException thrown if failed to merge configuration
292         */
293        private Configuration mergeConfig(Element coordElem) throws CommandException {
294            String jobConf = bundleJob.getConf();
295            // Step 1: runConf = jobConf
296            Configuration runConf = null;
297            try {
298                runConf = new XConfiguration(new StringReader(jobConf));
299            }
300            catch (IOException e1) {
301                LOG.warn("Configuration parse error in:" + jobConf);
302                throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1);
303            }
304            // Step 2: Merge local properties into runConf
305            // extract 'property' tags under 'configuration' block in the coordElem
306            // convert Element to XConfiguration
307            Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace());
308    
309            if (localConfigElement != null) {
310                String strConfig = XmlUtils.prettyPrint(localConfigElement).toString();
311                Configuration localConf;
312                try {
313                    localConf = new XConfiguration(new StringReader(strConfig));
314                }
315                catch (IOException e1) {
316                    LOG.warn("Configuration parse error in:" + strConfig);
317                    throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1);
318                }
319    
320                // copy configuration properties in the coordElem to the runConf
321                XConfiguration.copy(localConf, runConf);
322            }
323    
324            // Step 3: Extract value of 'app-path' in coordElem, save it as a
325            // new property called 'oozie.coord.application.path', and normalize.
326            String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue();
327            runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
328            // Normalize coordinator appPath here;
329            try {
330                JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf);
331            }
332            catch (IOException e) {
333                throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH));
334            }
335            return runConf;
336        }
337    
338        /* (non-Javadoc)
339         * @see org.apache.oozie.command.TransitionXCommand#getJob()
340         */
341        @Override
342        public Job getJob() {
343            return bundleJob;
344        }
345    
346        /* (non-Javadoc)
347         * @see org.apache.oozie.command.TransitionXCommand#updateJob()
348         */
349        @Override
350        public void updateJob() throws CommandException {
351            try {
352                jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
353            }
354            catch (JPAExecutorException je) {
355                throw new CommandException(je);
356            }
357        }
358    }