001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.executor.jpa; 020 021import java.sql.Timestamp; 022import java.text.ParseException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Map; 029import java.util.Map.Entry; 030import java.util.regex.Matcher; 031import java.util.regex.Pattern; 032 033import javax.persistence.EntityManager; 034import javax.persistence.Query; 035 036import org.apache.oozie.BulkResponseInfo; 037import org.apache.oozie.BundleJobBean; 038import org.apache.oozie.CoordinatorActionBean; 039import org.apache.oozie.CoordinatorJobBean; 040import org.apache.oozie.ErrorCode; 041import org.apache.oozie.StringBlob; 042import org.apache.oozie.client.BundleJob; 043import org.apache.oozie.client.CoordinatorAction; 044import org.apache.oozie.client.CoordinatorJob; 045import org.apache.oozie.client.rest.BulkResponseImpl; 046import org.apache.oozie.service.Services; 047import org.apache.oozie.util.DateUtils; 048import org.apache.oozie.util.ParamChecker; 049 050/** 051 * The query executor class for bulk monitoring queries i.e. debugging bundle coord actions directly 052 */ 053public class BulkJPAExecutor implements JPAExecutor<BulkResponseInfo> { 054 private Map<String, List<String>> bulkFilter; 055 // defaults 056 private int start = 1; 057 private int len = 50; 058 private enum PARAM_TYPE { 059 ID, NAME 060 } 061 062 public BulkJPAExecutor(Map<String, List<String>> bulkFilter, int start, int len) { 063 ParamChecker.notNull(bulkFilter, "bulkFilter"); 064 this.bulkFilter = bulkFilter; 065 this.start = start; 066 this.len = len; 067 } 068 069 @Override 070 public String getName() { 071 return "BulkJPAExecutor"; 072 } 073 074 @Override 075 public BulkResponseInfo execute(EntityManager em) throws JPAExecutorException { 076 List<BulkResponseImpl> responseList = new ArrayList<BulkResponseImpl>(); 077 Map<String, Timestamp> actionTimes = new HashMap<String, Timestamp>(); 078 079 try { 080 List<String> coords = bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD); 081 List<String> statuses = bulkFilter.get(BulkResponseImpl.BULK_FILTER_STATUS); 082 List<String> params = new ArrayList<String>(); 083 084 // Lightweight Query 1 on Bundle level to fetch the bundle job(s) 085 // corresponding to names or ids 086 List<BundleJobBean> bundleBeans = bundleQuery(em); 087 088 // Join query between coordinator job and coordinator action tables 089 // to get entries for specific bundleId only 090 String conditions = actionQuery(coords, params, statuses, em, bundleBeans, actionTimes, responseList); 091 092 // Query to get the count of records 093 long total = countQuery(statuses, params, conditions, em, bundleBeans, actionTimes); 094 095 BulkResponseInfo bulk = new BulkResponseInfo(responseList, start, len, total); 096 return bulk; 097 } 098 catch (Exception e) { 099 throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e); 100 } 101 } 102 103 /** 104 * build the bundle level query to get bundle beans for the specified ids or appnames 105 * @param em 106 * @return List BundleJobBeans 107 * @throws JPAExecutorException 108 */ 109 @SuppressWarnings("unchecked") 110 private List<BundleJobBean> bundleQuery(EntityManager em) throws JPAExecutorException { 111 Query q = em.createNamedQuery("BULK_MONITOR_BUNDLE_QUERY"); 112 StringBuilder bundleQuery = new StringBuilder(q.toString()); 113 114 StringBuilder whereClause = null; 115 List<String> bundles = bulkFilter.get(BulkResponseImpl.BULK_FILTER_BUNDLE); 116 if (bundles != null) { 117 PARAM_TYPE type = getParamType(bundles.get(0), 'B'); 118 if (type == PARAM_TYPE.NAME) { 119 whereClause = inClause(bundles.size(), "appName", 'b', "bundles"); 120 } 121 else if (type == PARAM_TYPE.ID) { 122 whereClause = inClause(bundles.size(), "id", 'b', "bundles"); 123 } 124 125 // Query: select <columns> from BundleJobBean b where b.id IN (...) _or_ b.appName IN (...) 126 bundleQuery.append(whereClause.replace(whereClause.indexOf("AND"), whereClause.indexOf("AND") + 3, "WHERE")); 127 Query tmp = em.createQuery(bundleQuery.toString()); 128 129 fillParameters(tmp, "bundles", bundles); 130 131 List<Object[]> bundleObjs = (List<Object[]>) tmp.getResultList(); 132 if (bundleObjs.isEmpty()) { 133 throw new JPAExecutorException(ErrorCode.E0603, "No entries found for given bundle(s)"); 134 } 135 136 List<BundleJobBean> bundleBeans = new ArrayList<BundleJobBean>(); 137 for (Object[] bundleElem : bundleObjs) { 138 bundleBeans.add(constructBundleBean(bundleElem)); 139 } 140 return bundleBeans; 141 } 142 return null; 143 } 144 145 /** 146 * Validate and determine whether passed param is job-id or appname 147 * @param id 148 * @param job 149 * @return PARAM_TYPE 150 */ 151 private PARAM_TYPE getParamType(String id, char job) { 152 Pattern p = Pattern.compile("\\d{7}-\\d{15}-" + Services.get().getSystemId() + "-" + job); 153 Matcher m = p.matcher(id); 154 if (m.matches()) { 155 return PARAM_TYPE.ID; 156 } 157 return PARAM_TYPE.NAME; 158 } 159 160 /** 161 * Compose the coord action level query comprising bundle id/appname filter and coord action 162 * status filter (if specified) and start-time or nominal-time filter (if specified) 163 * @param em 164 * @param bundles 165 * @param times 166 * @param responseList 167 * @return Query string 168 * @throws ParseException 169 */ 170 @SuppressWarnings("unchecked") 171 private String actionQuery(final List<String> coords, final List<String> params, List<String> statuses, EntityManager em, 172 List<BundleJobBean> bundles, Map<String, Timestamp> times, List<BulkResponseImpl> responseList) 173 throws ParseException { 174 Query q = em.createNamedQuery("BULK_MONITOR_ACTIONS_QUERY"); 175 StringBuilder getActions = new StringBuilder(q.toString()); 176 int offset = getActions.indexOf("ORDER"); 177 StringBuilder conditionClause = new StringBuilder(); 178 179 // Query: Select <columns> from CoordinatorActionBean a, CoordinatorJobBean c WHERE a.jobId = c.id 180 // AND c.bundleId = :bundleId AND c.appName/id IN (...) 181 182 if (coords != null) { 183 PARAM_TYPE type = getParamType(coords.get(0), 'C'); 184 if (type == PARAM_TYPE.NAME) { 185 conditionClause.append(inClause(coords.size(), "appName", 'c', "param")); 186 params.addAll(coords); 187 } 188 else if (type == PARAM_TYPE.ID) { 189 conditionClause.append(inClause(coords.size(), "id", 'c', "param")); 190 params.addAll(coords); 191 } 192 } 193 // Query: Select <columns> from CoordinatorActionBean a, CoordinatorJobBean c WHERE a.jobId = c.id 194 // AND c.bundleId = :bundleId AND c.appName/id IN (...) AND a.statusStr IN (...) 195 conditionClause.append(statusClause(statuses)); 196 197 offset = getActions.indexOf("ORDER"); 198 getActions.insert(offset - 1, conditionClause); 199 200 // Query: Select <columns> from CoordinatorActionBean a, CoordinatorJobBean c WHERE a.jobId = c.id 201 // AND c.bundleId = :bundleId AND c.appName/id IN (...) AND a.statusStr IN (...) 202 // AND a.createdTimestamp >= startCreated _or_ a.createdTimestamp <= endCreated 203 // AND a.nominalTimestamp >= startNominal _or_ a.nominalTimestamp <= endNominal 204 timesClause(getActions, offset, times); 205 q = em.createQuery(getActions.toString()); 206 Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator(); 207 while (iter.hasNext()) { 208 Entry<String, Timestamp> time = iter.next(); 209 q.setParameter(time.getKey(), time.getValue()); 210 } 211 // pagination 212 q.setFirstResult(start - 1); 213 q.setMaxResults(len); 214 215 if (coords != null) { 216 fillParameters(q, "param", coords); 217 } 218 219 if (statuses != null) { 220 fillParameters(q, "status", statuses); 221 } 222 223 // repeatedly execute above query for each bundle 224 for (BundleJobBean bundle : bundles) { 225 q.setParameter("bundleId", bundle.getId()); 226 List<Object[]> response = q.getResultList(); 227 for (Object[] r : response) { 228 BulkResponseImpl br = getResponseFromObject(bundle, r); 229 responseList.add(br); 230 } 231 } 232 return q.toString(); 233 } 234 235 /** 236 * Get total number of records for use with offset and len in API 237 * @param clause 238 * @param em 239 * @param bundles 240 * @return total count of coord actions 241 */ 242 private long countQuery(List<String> statuses, List<String> params, String clause, EntityManager em, 243 List<BundleJobBean> bundles, Map<String, Timestamp> times) { 244 Query q = em.createNamedQuery("BULK_MONITOR_COUNT_QUERY"); 245 StringBuilder getTotal = new StringBuilder(q.toString() + " "); 246 // Query: select COUNT(a) from CoordinatorActionBean a, CoordinatorJobBean c 247 // get entire WHERE clause from above i.e. actionQuery() for all conditions on coordinator job 248 // and action status and times 249 getTotal.append(clause.substring(clause.indexOf("WHERE"), clause.indexOf("ORDER"))); 250 int offset = getTotal.indexOf("bundleId"); 251 List<String> bundleIds = new ArrayList<String>(); 252 for (BundleJobBean bundle : bundles) { 253 bundleIds.add(bundle.getId()); 254 } 255 // Query: select COUNT(a) from CoordinatorActionBean a, CoordinatorJobBean c WHERE ... 256 // AND c.bundleId IN (... list of bundle ids) i.e. replace single :bundleId with list 257 getTotal = getTotal.replace(offset - 6, offset + 20, inClause(bundleIds.size(), "bundleId", 'c', "count").toString()); 258 q = em.createQuery(getTotal.toString()); 259 260 fillParameters(q, "count", bundleIds); 261 262 if (statuses != null) { 263 fillParameters(q, "status", statuses); 264 } 265 266 if (params != null) { 267 fillParameters(q, "param", params); 268 } 269 270 Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator(); 271 while (iter.hasNext()) { 272 Entry<String, Timestamp> time = iter.next(); 273 q.setParameter(time.getKey(), time.getValue()); 274 } 275 long total = ((Long) q.getSingleResult()).longValue(); 276 return total; 277 } 278 279 // Form the where clause to filter by coordinator appname/id 280 private StringBuilder inClause(int noOfValues, String col, char type, String paramPrefix) { 281 StringBuilder sb = new StringBuilder(); 282 boolean firstVal = true; 283 284 for (int i = 0; i < noOfValues; i++) { 285 if (firstVal) { 286 sb.append(" AND " + type + "." + col + " IN (:" + paramPrefix + i); 287 firstVal = false; 288 } 289 else { 290 sb.append(", :" + paramPrefix + i); 291 } 292 } 293 if (!firstVal) { 294 sb.append(") "); 295 } 296 297 return sb; 298 } 299 300 // Form the where clause to filter by coord action status 301 private StringBuilder statusClause(List<String> statuses) { 302 StringBuilder sb = new StringBuilder(); 303 304 if (statuses != null) { 305 sb = inClause(statuses.size(), "statusStr", 'a', "status"); 306 } 307 308 if (sb.length() == 0) { // statuses was null. adding default 309 sb.append(" AND a.statusStr IN ('KILLED', 'FAILED') "); 310 } 311 312 return sb; 313 } 314 315 private void timesClause(StringBuilder sb, int offset, Map<String, Timestamp> eachTime) throws ParseException { 316 Timestamp ts = null; 317 List<String> times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_CREATED_EPOCH); 318 if (times != null) { 319 ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()); 320 sb.insert(offset - 1, " AND a.createdTimestamp >= :startCreated"); 321 eachTime.put("startCreated", ts); 322 } 323 times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_CREATED_EPOCH); 324 if (times != null) { 325 ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()); 326 sb.insert(offset - 1, " AND a.createdTimestamp <= :endCreated"); 327 eachTime.put("endCreated", ts); 328 } 329 times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_NOMINAL_EPOCH); 330 if (times != null) { 331 ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()); 332 sb.insert(offset - 1, " AND a.nominalTimestamp >= :startNominal"); 333 eachTime.put("startNominal", ts); 334 } 335 times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_NOMINAL_EPOCH); 336 if (times != null) { 337 ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()); 338 sb.insert(offset - 1, " AND a.nominalTimestamp <= :endNominal"); 339 eachTime.put("endNominal", ts); 340 } 341 } 342 343 private BulkResponseImpl getResponseFromObject(BundleJobBean bundleBean, Object arr[]) { 344 BulkResponseImpl bean = new BulkResponseImpl(); 345 CoordinatorJobBean coordBean = new CoordinatorJobBean(); 346 CoordinatorActionBean actionBean = new CoordinatorActionBean(); 347 if (arr[0] != null) { 348 actionBean.setId((String) arr[0]); 349 } 350 if (arr[1] != null) { 351 actionBean.setActionNumber((Integer) arr[1]); 352 } 353 if (arr[2] != null) { 354 actionBean.setErrorCode((String) arr[2]); 355 } 356 if (arr[3] != null) { 357 actionBean.setErrorMessage((String) arr[3]); 358 } 359 if (arr[4] != null) { 360 actionBean.setExternalId((String) arr[4]); 361 } 362 if (arr[5] != null) { 363 actionBean.setExternalStatus((String) arr[5]); 364 } 365 if (arr[6] != null) { 366 actionBean.setStatus(CoordinatorAction.Status.valueOf((String) arr[6])); 367 } 368 if (arr[7] != null) { 369 actionBean.setCreatedTime(DateUtils.toDate((Timestamp) arr[7])); 370 } 371 if (arr[8] != null) { 372 actionBean.setNominalTime(DateUtils.toDate((Timestamp) arr[8])); 373 } 374 if (arr[9] != null) { 375 actionBean.setMissingDependenciesBlob((StringBlob) arr[9]); 376 } 377 if (arr[10] != null) { 378 coordBean.setId((String) arr[10]); 379 actionBean.setJobId((String) arr[10]); 380 } 381 if (arr[11] != null) { 382 coordBean.setAppName((String) arr[11]); 383 } 384 if (arr[12] != null) { 385 coordBean.setStatus(CoordinatorJob.Status.valueOf((String) arr[12])); 386 } 387 bean.setBundle(bundleBean); 388 bean.setCoordinator(coordBean); 389 bean.setAction(actionBean); 390 return bean; 391 } 392 393 private BundleJobBean constructBundleBean(Object[] barr) throws JPAExecutorException { 394 BundleJobBean bean = new BundleJobBean(); 395 if (barr[0] != null) { 396 bean.setId((String) barr[0]); 397 } 398 else { 399 throw new JPAExecutorException(ErrorCode.E0603, 400 "bundleId returned by query is null - cannot retrieve bulk results"); 401 } 402 if (barr[1] != null) { 403 bean.setAppName((String) barr[1]); 404 } 405 if (barr[2] != null) { 406 bean.setStatus(BundleJob.Status.valueOf((String) barr[2])); 407 } 408 if (barr[3] != null) { 409 bean.setUser((String) barr[3]); 410 } 411 return bean; 412 } 413 414 private void fillParameters(Query query, String prefix, List<?> values) { 415 for (int i = 0; i < values.size(); i++) { 416 query.setParameter(prefix + i, values.get(i)); 417 } 418 } 419 420 // null safeguard 421 public static List<String> nullToEmpty(List<String> input) { 422 return input == null ? Collections.<String> emptyList() : input; 423 } 424 425}