001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.bundle;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.util.Date;
023    import java.util.HashMap;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Map.Entry;
027    
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.oozie.BundleActionBean;
030    import org.apache.oozie.BundleJobBean;
031    import org.apache.oozie.ErrorCode;
032    import org.apache.oozie.XException;
033    import org.apache.oozie.client.Job;
034    import org.apache.oozie.client.OozieClient;
035    import org.apache.oozie.client.rest.JsonBean;
036    import org.apache.oozie.command.CommandException;
037    import org.apache.oozie.command.PreconditionException;
038    import org.apache.oozie.command.StartTransitionXCommand;
039    import org.apache.oozie.command.coord.CoordSubmitXCommand;
040    import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
041    import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
042    import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
043    import org.apache.oozie.executor.jpa.JPAExecutorException;
044    import org.apache.oozie.service.JPAService;
045    import org.apache.oozie.service.Services;
046    import org.apache.oozie.util.JobUtils;
047    import org.apache.oozie.util.LogUtils;
048    import org.apache.oozie.util.ParamChecker;
049    import org.apache.oozie.util.XConfiguration;
050    import org.apache.oozie.util.XmlUtils;
051    import org.jdom.Attribute;
052    import org.jdom.Element;
053    import org.jdom.JDOMException;
054    
055    /**
056     * The command to start Bundle job
057     */
058    public class BundleStartXCommand extends StartTransitionXCommand {
059        private final String jobId;
060        private BundleJobBean bundleJob;
061        private JPAService jpaService = null;
062    
063        /**
064         * The constructor for class {@link BundleStartXCommand}
065         *
066         * @param jobId the bundle job id
067         */
068        public BundleStartXCommand(String jobId) {
069            super("bundle_start", "bundle_start", 1);
070            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
071        }
072    
073        /**
074         * The constructor for class {@link BundleStartXCommand}
075         *
076         * @param jobId the bundle job id
077         * @param dryrun true if dryrun is enable
078         */
079        public BundleStartXCommand(String jobId, boolean dryrun) {
080            super("bundle_start", "bundle_start", 1, dryrun);
081            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
082        }
083    
084        /* (non-Javadoc)
085         * @see org.apache.oozie.command.XCommand#getEntityKey()
086         */
087        @Override
088        public String getEntityKey() {
089            return jobId;
090        }
091    
092        /* (non-Javadoc)
093         * @see org.apache.oozie.command.XCommand#isLockRequired()
094         */
095        @Override
096        protected boolean isLockRequired() {
097            return true;
098        }
099    
100        /* (non-Javadoc)
101         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
102         */
103        @Override
104        protected void verifyPrecondition() throws CommandException, PreconditionException {
105            eagerVerifyPrecondition();
106        }
107    
108        /* (non-Javadoc)
109         * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
110         */
111        @Override
112        protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
113            if (bundleJob.getStatus() != Job.Status.PREP) {
114                String msg = "Bundle " + bundleJob.getId() + " is not in PREP status. It is in : " + bundleJob.getStatus();
115                LOG.info(msg);
116                throw new PreconditionException(ErrorCode.E1100, msg);
117            }
118        }
119        /* (non-Javadoc)
120         * @see org.apache.oozie.command.XCommand#loadState()
121         */
122        @Override
123        public void loadState() throws CommandException {
124            eagerLoadState();
125        }
126    
127        /* (non-Javadoc)
128         * @see org.apache.oozie.command.XCommand#eagerLoadState()
129         */
130        @Override
131        public void eagerLoadState() throws CommandException {
132            try {
133                jpaService = Services.get().get(JPAService.class);
134    
135                if (jpaService != null) {
136                    this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
137                    LogUtils.setLogInfo(bundleJob, logInfo);
138                    super.setJob(bundleJob);
139    
140                }
141                else {
142                    throw new CommandException(ErrorCode.E0610);
143                }
144            }
145            catch (XException ex) {
146                throw new CommandException(ex);
147            }
148        }
149    
150        /* (non-Javadoc)
151         * @see org.apache.oozie.command.StartTransitionXCommand#StartChildren()
152         */
153        @Override
154        public void StartChildren() throws CommandException {
155            LOG.debug("Started coord jobs for the bundle=[{0}]", jobId);
156            insertBundleActions();
157            startCoordJobs();
158            LOG.debug("Ended coord jobs for the bundle=[{0}]", jobId);
159        }
160    
161        /* (non-Javadoc)
162         * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
163         */
164        @Override
165        public void notifyParent() {
166        }
167    
168        /* (non-Javadoc)
169         * @see org.apache.oozie.command.StartTransitionXCommand#performWrites()
170         */
171        @Override
172        public void performWrites() throws CommandException {
173            try {
174                jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
175            }
176            catch (JPAExecutorException e) {
177                throw new CommandException(e);
178            }
179        }
180    
181        /**
182         * Insert bundle actions
183         *
184         * @throws CommandException thrown if failed to create bundle actions
185         */
186        @SuppressWarnings("unchecked")
187        private void insertBundleActions() throws CommandException {
188            if (bundleJob != null) {
189                Map<String, Boolean> map = new HashMap<String, Boolean>();
190                try {
191                    Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
192                    List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
193                    for (Element elem : coordElems) {
194                        Attribute name = elem.getAttribute("name");
195                        Attribute critical = elem.getAttribute("critical");
196                        if (name != null) {
197                            if (map.containsKey(name.getValue())) {
198                                throw new CommandException(ErrorCode.E1304, name);
199                            }
200                            boolean isCritical = false;
201                            if (critical != null && Boolean.parseBoolean(critical.getValue())) {
202                                isCritical = true;
203                            }
204                            map.put(name.getValue(), isCritical);
205                        }
206                        else {
207                            throw new CommandException(ErrorCode.E1305);
208                        }
209                    }
210                }
211                catch (JDOMException jex) {
212                    throw new CommandException(ErrorCode.E1301, jex);
213                }
214    
215                // if there is no coordinator for this bundle, failed it.
216                if (map.isEmpty()) {
217                    bundleJob.setStatus(Job.Status.FAILED);
218                    bundleJob.resetPending();
219                    try {
220                        jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
221                    }
222                    catch (JPAExecutorException jex) {
223                        throw new CommandException(jex);
224                    }
225    
226                    LOG.debug("No coord jobs for the bundle=[{0}], failed it!!", jobId);
227                    throw new CommandException(ErrorCode.E1318, jobId);
228                }
229    
230                for (Entry<String, Boolean> coordName : map.entrySet()) {
231                    BundleActionBean action = createBundleAction(jobId, coordName.getKey(), coordName.getValue());
232                    insertList.add(action);
233                }
234            }
235            else {
236                throw new CommandException(ErrorCode.E0604, jobId);
237            }
238        }
239    
240        private BundleActionBean createBundleAction(String jobId, String coordName, boolean isCritical) {
241            BundleActionBean action = new BundleActionBean();
242            action.setBundleActionId(jobId + "_" + coordName);
243            action.setBundleId(jobId);
244            action.setCoordName(coordName);
245            action.setStatus(Job.Status.PREP);
246            action.setLastModifiedTime(new Date());
247            if (isCritical) {
248                action.setCritical();
249            }
250            else {
251                action.resetCritical();
252            }
253            return action;
254        }
255    
256        /**
257         * Start Coord Jobs
258         *
259         * @throws CommandException thrown if failed to start coord jobs
260         */
261        @SuppressWarnings("unchecked")
262        private void startCoordJobs() throws CommandException {
263            if (bundleJob != null) {
264                try {
265                    Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
266                    List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
267                    for (Element coordElem : coordElems) {
268                        Attribute name = coordElem.getAttribute("name");
269                        Configuration coordConf = mergeConfig(coordElem);
270                        coordConf.set(OozieClient.BUNDLE_ID, jobId);
271    
272                        queue(new CoordSubmitXCommand(coordConf, bundleJob.getAuthToken(), bundleJob.getId(), name.getValue()));
273    
274                    }
275                    updateBundleAction();
276                }
277                catch (JDOMException jex) {
278                    throw new CommandException(ErrorCode.E1301, jex);
279                }
280                catch (JPAExecutorException je) {
281                    throw new CommandException(je);
282                }
283            }
284            else {
285                throw new CommandException(ErrorCode.E0604, jobId);
286            }
287        }
288    
289        private void updateBundleAction() throws JPAExecutorException {
290            for(JsonBean bAction : insertList) {
291                BundleActionBean action = (BundleActionBean) bAction;
292                action.incrementAndGetPending();
293                action.setLastModifiedTime(new Date());
294            }
295        }
296    
297        /**
298         * Merge Bundle job config and the configuration from the coord job to pass
299         * to Coord Engine
300         *
301         * @param coordElem the coordinator configuration
302         * @return Configuration merged configuration
303         * @throws CommandException thrown if failed to merge configuration
304         */
305        private Configuration mergeConfig(Element coordElem) throws CommandException {
306            String jobConf = bundleJob.getConf();
307            // Step 1: runConf = jobConf
308            Configuration runConf = null;
309            try {
310                runConf = new XConfiguration(new StringReader(jobConf));
311            }
312            catch (IOException e1) {
313                LOG.warn("Configuration parse error in:" + jobConf);
314                throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1);
315            }
316            // Step 2: Merge local properties into runConf
317            // extract 'property' tags under 'configuration' block in the coordElem
318            // convert Element to XConfiguration
319            Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace());
320    
321            if (localConfigElement != null) {
322                String strConfig = XmlUtils.prettyPrint(localConfigElement).toString();
323                Configuration localConf;
324                try {
325                    localConf = new XConfiguration(new StringReader(strConfig));
326                }
327                catch (IOException e1) {
328                    LOG.warn("Configuration parse error in:" + strConfig);
329                    throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1);
330                }
331    
332                // copy configuration properties in the coordElem to the runConf
333                XConfiguration.copy(localConf, runConf);
334            }
335    
336            // Step 3: Extract value of 'app-path' in coordElem, save it as a
337            // new property called 'oozie.coord.application.path', and normalize.
338            String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue();
339            runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
340            // Normalize coordinator appPath here;
341            try {
342                JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf);
343            }
344            catch (IOException e) {
345                throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH));
346            }
347            return runConf;
348        }
349    
350        /* (non-Javadoc)
351         * @see org.apache.oozie.command.TransitionXCommand#getJob()
352         */
353        @Override
354        public Job getJob() {
355            return bundleJob;
356        }
357    
358        /* (non-Javadoc)
359         * @see org.apache.oozie.command.TransitionXCommand#updateJob()
360         */
361        @Override
362        public void updateJob() throws CommandException {
363            updateList.add(bundleJob);
364        }
365    }