This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.executor.jpa;
019
020 import java.sql.Timestamp;
021 import java.text.ParseException;
022 import java.util.ArrayList;
023 import java.util.Collections;
024 import java.util.HashMap;
025 import java.util.Iterator;
026 import java.util.List;
027 import java.util.Map;
028 import java.util.Map.Entry;
029
030 import javax.persistence.EntityManager;
031 import javax.persistence.Query;
032
033 import org.apache.oozie.BundleJobBean;
034 import org.apache.oozie.client.BundleJob;
035 import org.apache.oozie.client.CoordinatorJob;
036 import org.apache.oozie.client.rest.BulkResponseImpl;
037 import org.apache.oozie.BulkResponseInfo;
038 import org.apache.oozie.CoordinatorActionBean;
039 import org.apache.oozie.CoordinatorJobBean;
040 import org.apache.oozie.ErrorCode;
041 import org.apache.oozie.client.CoordinatorAction;
042 import org.apache.oozie.util.DateUtils;
043 import org.apache.oozie.util.ParamChecker;
044
045 /**
046 * The query executor class for bulk monitoring queries i.e. debugging bundle ->
047 * coord actions directly
048 */
049 public class BulkJPAExecutor implements JPAExecutor<BulkResponseInfo> {
050 private Map<String, List<String>> bulkFilter;
051 // defaults
052 private int start = 1;
053 private int len = 50;
054
055 public BulkJPAExecutor(Map<String, List<String>> bulkFilter, int start, int len) {
056 ParamChecker.notNull(bulkFilter, "bulkFilter");
057 this.bulkFilter = bulkFilter;
058 this.start = start;
059 this.len = len;
060 }
061
062 /*
063 * (non-Javadoc)
064 * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
065 */
066 @Override
067 public String getName() {
068 return "BulkJPAExecutor";
069 }
070
071 /*
072 * (non-Javadoc)
073 * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager)
074 */
075 @Override
076 public BulkResponseInfo execute(EntityManager em) throws JPAExecutorException {
077 List<BulkResponseImpl> responseList = new ArrayList<BulkResponseImpl>();
078 Map<String, Timestamp> actionTimes = new HashMap<String, Timestamp>();
079
080 try {
081 // Lightweight Query 1 on Bundle level to fetch the bundle job
082 // corresponding to name
083 BundleJobBean bundleBean = bundleQuery(em);
084
085 // Join query between coordinator job and coordinator action tables
086 // to get entries for specific bundleId only
087 String conditions = actionQuery(em, bundleBean, actionTimes, responseList);
088
089 // Query to get the count of records
090 long total = countQuery(conditions, em, bundleBean, actionTimes);
091
092 BulkResponseInfo bulk = new BulkResponseInfo(responseList, start, len, total);
093 return bulk;
094 }
095 catch (Exception e) {
096 throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
097 }
098 }
099
100 @SuppressWarnings("unchecked")
101 private BundleJobBean bundleQuery(EntityManager em) throws JPAExecutorException {
102 BundleJobBean bundleBean = new BundleJobBean();
103 String bundleName = bulkFilter.get(BulkResponseImpl.BULK_FILTER_BUNDLE_NAME).get(0);
104 Query q = em.createNamedQuery("BULK_MONITOR_BUNDLE_QUERY");
105 q.setParameter("appName", bundleName);
106 List<Object[]> bundles = (List<Object[]>) q.getResultList();
107 if (bundles.isEmpty()) {
108 throw new JPAExecutorException(ErrorCode.E0603, "No bundle entries found for bundle name: "
109 + bundleName);
110 }
111 if (bundles.size() > 1) { // more than one bundles running with same
112 // name - ERROR. Fail fast
113 throw new JPAExecutorException(ErrorCode.E0603, "Non-unique bundles present for same bundle name: "
114 + bundleName);
115 }
116 bundleBean = getBeanForBundleJob(bundles.get(0), bundleName);
117 return bundleBean;
118 }
119
120 @SuppressWarnings("unchecked")
121 private String actionQuery(EntityManager em, BundleJobBean bundleBean,
122 Map<String, Timestamp> times, List<BulkResponseImpl> responseList) throws ParseException {
123 Query q = em.createNamedQuery("BULK_MONITOR_ACTIONS_QUERY");
124 StringBuilder getActions = new StringBuilder(q.toString());
125 StringBuilder conditionClause = new StringBuilder();
126 conditionClause.append(coordNamesClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD_NAME)));
127 conditionClause.append(statusClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_STATUS)));
128 int offset = getActions.indexOf("ORDER");
129 getActions.insert(offset - 1, conditionClause);
130 timesClause(getActions, offset, times);
131 q = em.createQuery(getActions.toString());
132 Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator();
133 while (iter.hasNext()) {
134 Entry<String, Timestamp> time = iter.next();
135 q.setParameter(time.getKey(), time.getValue());
136 }
137 q.setParameter("bundleId", bundleBean.getId());
138 // pagination
139 q.setFirstResult(start - 1);
140 q.setMaxResults(len);
141
142 List<Object[]> response = q.getResultList();
143 for (Object[] r : response) {
144 BulkResponseImpl br = getResponseFromObject(bundleBean, r);
145 responseList.add(br);
146 }
147 return q.toString();
148 }
149
150 private long countQuery(String clause, EntityManager em, BundleJobBean bundleBean, Map<String, Timestamp> times) {
151 Query q = em.createNamedQuery("BULK_MONITOR_COUNT_QUERY");
152 StringBuilder getTotal = new StringBuilder(q.toString() + " ");
153 getTotal.append(clause.substring(clause.indexOf("WHERE"), clause.indexOf("ORDER")));
154 q = em.createQuery(getTotal.toString());
155 q.setParameter("bundleId", bundleBean.getId());
156 Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator();
157 while (iter.hasNext()) {
158 Entry<String, Timestamp> time = iter.next();
159 q.setParameter(time.getKey(), time.getValue());
160 }
161 long total = ((Long) q.getSingleResult()).longValue();
162 return total;
163 }
164
165 // Form the where clause to filter by coordinator names
166 private StringBuilder coordNamesClause(List<String> coordNames) {
167 StringBuilder sb = new StringBuilder();
168 boolean firstVal = true;
169 for (String name : nullToEmpty(coordNames)) {
170 if (firstVal) {
171 sb.append(" AND c.appName IN (\'" + name + "\'");
172 firstVal = false;
173 }
174 else {
175 sb.append(",\'" + name + "\'");
176 }
177 }
178 if (!firstVal) {
179 sb.append(") ");
180 }
181 return sb;
182 }
183
184 // Form the where clause to filter by coord action status
185 private StringBuilder statusClause(List<String> statuses) {
186 StringBuilder sb = new StringBuilder();
187 boolean firstVal = true;
188 for (String status : nullToEmpty(statuses)) {
189 if (firstVal) {
190 sb.append(" AND a.status IN (\'" + status + "\'");
191 firstVal = false;
192 }
193 else {
194 sb.append(",\'" + status + "\'");
195 }
196 }
197 if (!firstVal) {
198 sb.append(") ");
199 }
200 else { // statuses was null. adding default
201 sb.append(" AND a.status IN ('KILLED', 'FAILED') ");
202 }
203 return sb;
204 }
205
206 private void timesClause(StringBuilder sb, int offset, Map<String, Timestamp> eachTime) throws ParseException {
207 Timestamp ts = null;
208 List<String> times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_CREATED_EPOCH);
209 if (times != null) {
210 ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
211 sb.insert(offset - 1, " AND a.createdTimestamp >= :startCreated");
212 eachTime.put("startCreated", ts);
213 }
214 times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_CREATED_EPOCH);
215 if (times != null) {
216 ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
217 sb.insert(offset - 1, " AND a.createdTimestamp <= :endCreated");
218 eachTime.put("endCreated", ts);
219 }
220 times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_NOMINAL_EPOCH);
221 if (times != null) {
222 ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
223 sb.insert(offset - 1, " AND a.nominalTimestamp >= :startNominal");
224 eachTime.put("startNominal", ts);
225 }
226 times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_NOMINAL_EPOCH);
227 if (times != null) {
228 ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
229 sb.insert(offset - 1, " AND a.nominalTimestamp <= :endNominal");
230 eachTime.put("endNominal", ts);
231 }
232 }
233
234 private BulkResponseImpl getResponseFromObject(BundleJobBean bundleBean, Object arr[]) {
235 BulkResponseImpl bean = new BulkResponseImpl();
236 CoordinatorJobBean coordBean = new CoordinatorJobBean();
237 CoordinatorActionBean actionBean = new CoordinatorActionBean();
238 if (arr[0] != null) {
239 actionBean.setId((String) arr[0]);
240 }
241 if (arr[1] != null) {
242 actionBean.setActionNumber((Integer) arr[1]);
243 }
244 if (arr[2] != null) {
245 actionBean.setErrorCode((String) arr[2]);
246 }
247 if (arr[3] != null) {
248 actionBean.setErrorMessage((String) arr[3]);
249 }
250 if (arr[4] != null) {
251 actionBean.setExternalId((String) arr[4]);
252 }
253 if (arr[5] != null) {
254 actionBean.setExternalStatus((String) arr[5]);
255 }
256 if (arr[6] != null) {
257 actionBean.setStatus(CoordinatorAction.Status.valueOf((String) arr[6]));
258 }
259 if (arr[7] != null) {
260 actionBean.setCreatedTime(DateUtils.toDate((Timestamp) arr[7]));
261 }
262 if (arr[8] != null) {
263 actionBean.setNominalTime(DateUtils.toDate((Timestamp) arr[8]));
264 }
265 if (arr[9] != null) {
266 actionBean.setMissingDependencies((String) arr[9]);
267 }
268 if (arr[10] != null) {
269 coordBean.setId((String) arr[10]);
270 actionBean.setJobId((String) arr[10]);
271 }
272 if (arr[11] != null) {
273 coordBean.setAppName((String) arr[11]);
274 }
275 if (arr[12] != null) {
276 coordBean.setStatus(CoordinatorJob.Status.valueOf((String) arr[12]));
277 }
278 bean.setBundle(bundleBean);
279 bean.setCoordinator(coordBean);
280 bean.setAction(actionBean);
281 return bean;
282 }
283
284 private BundleJobBean getBeanForBundleJob(Object[] barr, String name) throws JPAExecutorException {
285 BundleJobBean bean = new BundleJobBean();
286 if (barr[0] != null) {
287 bean.setId((String) barr[0]);
288 }
289 else {
290 throw new JPAExecutorException(ErrorCode.E0603,
291 "bundleId returned by query is null - cannot retrieve bulk results");
292 }
293 bean.setAppName(name);
294 if (barr[1] != null) {
295 bean.setStatus(BundleJob.Status.valueOf((String) barr[1]));
296 }
297 return bean;
298 }
299
300 // null safeguard
301 public static List<String> nullToEmpty(List<String> input) {
302 return input == null ? Collections.<String> emptyList() : input;
303 }
304
305 }