001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.executor.jpa; 019 020import java.sql.Timestamp; 021import java.text.ParseException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map; 028import java.util.Map.Entry; 029import java.util.regex.Matcher; 030import java.util.regex.Pattern; 031 032import javax.persistence.EntityManager; 033import javax.persistence.Query; 034 035import org.apache.oozie.BundleJobBean; 036import org.apache.oozie.client.BundleJob; 037import org.apache.oozie.client.CoordinatorJob; 038import org.apache.oozie.client.rest.BulkResponseImpl; 039import org.apache.oozie.BulkResponseInfo; 040import org.apache.oozie.CoordinatorActionBean; 041import org.apache.oozie.CoordinatorJobBean; 042import org.apache.oozie.ErrorCode; 043import org.apache.oozie.StringBlob; 044import org.apache.oozie.client.CoordinatorAction; 045import org.apache.oozie.service.Services; 046import org.apache.oozie.util.DateUtils; 047import org.apache.oozie.util.ParamChecker; 048 049/** 050 * The query executor class for bulk monitoring queries i.e. debugging bundle -> 051 * coord actions directly 052 */ 053public class BulkJPAExecutor implements JPAExecutor<BulkResponseInfo> { 054 private Map<String, List<String>> bulkFilter; 055 // defaults 056 private int start = 1; 057 private int len = 50; 058 private enum PARAM_TYPE { 059 ID, NAME 060 } 061 062 public BulkJPAExecutor(Map<String, List<String>> bulkFilter, int start, int len) { 063 ParamChecker.notNull(bulkFilter, "bulkFilter"); 064 this.bulkFilter = bulkFilter; 065 this.start = start; 066 this.len = len; 067 } 068 069 @Override 070 public String getName() { 071 return "BulkJPAExecutor"; 072 } 073 074 @Override 075 public BulkResponseInfo execute(EntityManager em) throws JPAExecutorException { 076 List<BulkResponseImpl> responseList = new ArrayList<BulkResponseImpl>(); 077 Map<String, Timestamp> actionTimes = new HashMap<String, Timestamp>(); 078 079 try { 080 // Lightweight Query 1 on Bundle level to fetch the bundle job(s) 081 // corresponding to names or ids 082 List<BundleJobBean> bundleBeans = bundleQuery(em); 083 084 // Join query between coordinator job and coordinator action tables 085 // to get entries for specific bundleId only 086 String conditions = actionQuery(em, bundleBeans, actionTimes, responseList); 087 088 // Query to get the count of records 089 long total = countQuery(conditions, em, bundleBeans, actionTimes); 090 091 BulkResponseInfo bulk = new BulkResponseInfo(responseList, start, len, total); 092 return bulk; 093 } 094 catch (Exception e) { 095 throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e); 096 } 097 } 098 099 /** 100 * build the bundle level query to get bundle beans for the specified ids or appnames 101 * @param em 102 * @return List BundleJobBeans 103 * @throws JPAExecutorException 104 */ 105 @SuppressWarnings("unchecked") 106 private List<BundleJobBean> bundleQuery(EntityManager em) throws JPAExecutorException { 107 Query q = em.createNamedQuery("BULK_MONITOR_BUNDLE_QUERY"); 108 StringBuilder bundleQuery = new StringBuilder(q.toString()); 109 110 StringBuilder whereClause = null; 111 List<String> bundles = bulkFilter.get(BulkResponseImpl.BULK_FILTER_BUNDLE); 112 if (bundles != null) { 113 PARAM_TYPE type = getParamType(bundles.get(0), 'B'); 114 if (type == PARAM_TYPE.NAME) { 115 whereClause = inClause(bundles, "appName", 'b'); 116 } 117 else if (type == PARAM_TYPE.ID) { 118 whereClause = inClause(bundles, "id", 'b'); 119 } 120 121 // Query: select <columns> from BundleJobBean b where b.id IN (...) _or_ b.appName IN (...) 122 bundleQuery.append(whereClause.replace(whereClause.indexOf("AND"), whereClause.indexOf("AND") + 3, "WHERE")); 123 List<Object[]> bundleObjs = (List<Object[]>) em.createQuery(bundleQuery.toString()).getResultList(); 124 if (bundleObjs.isEmpty()) { 125 throw new JPAExecutorException(ErrorCode.E0603, "No entries found for given bundle(s)"); 126 } 127 128 List<BundleJobBean> bundleBeans = new ArrayList<BundleJobBean>(); 129 for (Object[] bundleElem : bundleObjs) { 130 bundleBeans.add(constructBundleBean(bundleElem)); 131 } 132 return bundleBeans; 133 } 134 return null; 135 } 136 137 /** 138 * Validate and determine whether passed param is job-id or appname 139 * @param id 140 * @param job 141 * @return PARAM_TYPE 142 */ 143 private PARAM_TYPE getParamType(String id, char job) { 144 Pattern p = Pattern.compile("\\d{7}-\\d{15}-" + Services.get().getSystemId() + "-" + job); 145 Matcher m = p.matcher(id); 146 if (m.matches()) { 147 return PARAM_TYPE.ID; 148 } 149 return PARAM_TYPE.NAME; 150 } 151 152 /** 153 * Compose the coord action level query comprising bundle id/appname filter and coord action 154 * status filter (if specified) and start-time or nominal-time filter (if specified) 155 * @param em 156 * @param bundles 157 * @param times 158 * @param responseList 159 * @return Query string 160 * @throws ParseException 161 */ 162 @SuppressWarnings("unchecked") 163 private String actionQuery(EntityManager em, List<BundleJobBean> bundles, 164 Map<String, Timestamp> times, List<BulkResponseImpl> responseList) throws ParseException { 165 Query q = em.createNamedQuery("BULK_MONITOR_ACTIONS_QUERY"); 166 StringBuilder getActions = new StringBuilder(q.toString()); 167 int offset = getActions.indexOf("ORDER"); 168 StringBuilder conditionClause = new StringBuilder(); 169 170 List<String> coords = bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD); 171 // Query: Select <columns> from CoordinatorActionBean a, CoordinatorJobBean c WHERE a.jobId = c.id 172 // AND c.bundleId = :bundleId AND c.appName/id IN (...) 173 if (coords != null) { 174 PARAM_TYPE type = getParamType(coords.get(0), 'C'); 175 if (type == PARAM_TYPE.NAME) { 176 conditionClause.append(inClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD), "appName", 'c')); 177 } 178 else if (type == PARAM_TYPE.ID) { 179 conditionClause.append(inClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD), "id", 'c')); 180 } 181 } 182 // Query: Select <columns> from CoordinatorActionBean a, CoordinatorJobBean c WHERE a.jobId = c.id 183 // AND c.bundleId = :bundleId AND c.appName/id IN (...) AND a.statusStr IN (...) 184 conditionClause.append(statusClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_STATUS))); 185 offset = getActions.indexOf("ORDER"); 186 getActions.insert(offset - 1, conditionClause); 187 188 // Query: Select <columns> from CoordinatorActionBean a, CoordinatorJobBean c WHERE a.jobId = c.id 189 // AND c.bundleId = :bundleId AND c.appName/id IN (...) AND a.statusStr IN (...) 190 // AND a.createdTimestamp >= startCreated _or_ a.createdTimestamp <= endCreated 191 // AND a.nominalTimestamp >= startNominal _or_ a.nominalTimestamp <= endNominal 192 timesClause(getActions, offset, times); 193 q = em.createQuery(getActions.toString()); 194 Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator(); 195 while (iter.hasNext()) { 196 Entry<String, Timestamp> time = iter.next(); 197 q.setParameter(time.getKey(), time.getValue()); 198 } 199 // pagination 200 q.setFirstResult(start - 1); 201 q.setMaxResults(len); 202 // repeatedly execute above query for each bundle 203 for (BundleJobBean bundle : bundles) { 204 q.setParameter("bundleId", bundle.getId()); 205 List<Object[]> response = q.getResultList(); 206 for (Object[] r : response) { 207 BulkResponseImpl br = getResponseFromObject(bundle, r); 208 responseList.add(br); 209 } 210 } 211 return q.toString(); 212 } 213 214 /** 215 * Get total number of records for use with offset and len in API 216 * @param clause 217 * @param em 218 * @param bundles 219 * @return total count of coord actions 220 */ 221 private long countQuery(String clause, EntityManager em, List<BundleJobBean> bundles, Map<String, Timestamp> times) { 222 Query q = em.createNamedQuery("BULK_MONITOR_COUNT_QUERY"); 223 StringBuilder getTotal = new StringBuilder(q.toString() + " "); 224 // Query: select COUNT(a) from CoordinatorActionBean a, CoordinatorJobBean c 225 // get entire WHERE clause from above i.e. actionQuery() for all conditions on coordinator job 226 // and action status and times 227 getTotal.append(clause.substring(clause.indexOf("WHERE"), clause.indexOf("ORDER"))); 228 int offset = getTotal.indexOf("bundleId"); 229 List<String> bundleIds = new ArrayList<String>(); 230 for (BundleJobBean bundle : bundles) { 231 bundleIds.add(bundle.getId()); 232 } 233 // Query: select COUNT(a) from CoordinatorActionBean a, CoordinatorJobBean c WHERE ... 234 // AND c.bundleId IN (... list of bundle ids) i.e. replace single :bundleId with list 235 getTotal = getTotal.replace(offset - 6, offset + 20, inClause(bundleIds, "bundleId", 'c').toString()); 236 q = em.createQuery(getTotal.toString()); 237 Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator(); 238 while (iter.hasNext()) { 239 Entry<String, Timestamp> time = iter.next(); 240 q.setParameter(time.getKey(), time.getValue()); 241 } 242 long total = ((Long) q.getSingleResult()).longValue(); 243 return total; 244 } 245 246 // Form the where clause to filter by coordinator appname/id 247 private StringBuilder inClause(List<String> values, String col, char type) { 248 StringBuilder sb = new StringBuilder(); 249 boolean firstVal = true; 250 for (String name : nullToEmpty(values)) { 251 if (firstVal) { 252 sb.append(" AND " + type + "." + col + " IN (\'" + name + "\'"); 253 firstVal = false; 254 } 255 else { 256 sb.append(",\'" + name + "\'"); 257 } 258 } 259 if (!firstVal) { 260 sb.append(") "); 261 } 262 return sb; 263 } 264 265 // Form the where clause to filter by coord action status 266 private StringBuilder statusClause(List<String> statuses) { 267 StringBuilder sb = inClause(statuses, "statusStr", 'a'); 268 if (sb.length() == 0) { // statuses was null. adding default 269 sb.append(" AND a.statusStr IN ('KILLED', 'FAILED') "); 270 } 271 return sb; 272 } 273 274 private void timesClause(StringBuilder sb, int offset, Map<String, Timestamp> eachTime) throws ParseException { 275 Timestamp ts = null; 276 List<String> times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_CREATED_EPOCH); 277 if (times != null) { 278 ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()); 279 sb.insert(offset - 1, " AND a.createdTimestamp >= :startCreated"); 280 eachTime.put("startCreated", ts); 281 } 282 times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_CREATED_EPOCH); 283 if (times != null) { 284 ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()); 285 sb.insert(offset - 1, " AND a.createdTimestamp <= :endCreated"); 286 eachTime.put("endCreated", ts); 287 } 288 times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_NOMINAL_EPOCH); 289 if (times != null) { 290 ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()); 291 sb.insert(offset - 1, " AND a.nominalTimestamp >= :startNominal"); 292 eachTime.put("startNominal", ts); 293 } 294 times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_NOMINAL_EPOCH); 295 if (times != null) { 296 ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()); 297 sb.insert(offset - 1, " AND a.nominalTimestamp <= :endNominal"); 298 eachTime.put("endNominal", ts); 299 } 300 } 301 302 private BulkResponseImpl getResponseFromObject(BundleJobBean bundleBean, Object arr[]) { 303 BulkResponseImpl bean = new BulkResponseImpl(); 304 CoordinatorJobBean coordBean = new CoordinatorJobBean(); 305 CoordinatorActionBean actionBean = new CoordinatorActionBean(); 306 if (arr[0] != null) { 307 actionBean.setId((String) arr[0]); 308 } 309 if (arr[1] != null) { 310 actionBean.setActionNumber((Integer) arr[1]); 311 } 312 if (arr[2] != null) { 313 actionBean.setErrorCode((String) arr[2]); 314 } 315 if (arr[3] != null) { 316 actionBean.setErrorMessage((String) arr[3]); 317 } 318 if (arr[4] != null) { 319 actionBean.setExternalId((String) arr[4]); 320 } 321 if (arr[5] != null) { 322 actionBean.setExternalStatus((String) arr[5]); 323 } 324 if (arr[6] != null) { 325 actionBean.setStatus(CoordinatorAction.Status.valueOf((String) arr[6])); 326 } 327 if (arr[7] != null) { 328 actionBean.setCreatedTime(DateUtils.toDate((Timestamp) arr[7])); 329 } 330 if (arr[8] != null) { 331 actionBean.setNominalTime(DateUtils.toDate((Timestamp) arr[8])); 332 } 333 if (arr[9] != null) { 334 actionBean.setMissingDependenciesBlob((StringBlob) arr[9]); 335 } 336 if (arr[10] != null) { 337 coordBean.setId((String) arr[10]); 338 actionBean.setJobId((String) arr[10]); 339 } 340 if (arr[11] != null) { 341 coordBean.setAppName((String) arr[11]); 342 } 343 if (arr[12] != null) { 344 coordBean.setStatus(CoordinatorJob.Status.valueOf((String) arr[12])); 345 } 346 bean.setBundle(bundleBean); 347 bean.setCoordinator(coordBean); 348 bean.setAction(actionBean); 349 return bean; 350 } 351 352 private BundleJobBean constructBundleBean(Object[] barr) throws JPAExecutorException { 353 BundleJobBean bean = new BundleJobBean(); 354 if (barr[0] != null) { 355 bean.setId((String) barr[0]); 356 } 357 else { 358 throw new JPAExecutorException(ErrorCode.E0603, 359 "bundleId returned by query is null - cannot retrieve bulk results"); 360 } 361 if (barr[1] != null) { 362 bean.setAppName((String) barr[1]); 363 } 364 if (barr[2] != null) { 365 bean.setStatus(BundleJob.Status.valueOf((String) barr[2])); 366 } 367 if (barr[3] != null) { 368 bean.setUser((String) barr[3]); 369 } 370 return bean; 371 } 372 373 // null safeguard 374 public static List<String> nullToEmpty(List<String> input) { 375 return input == null ? Collections.<String> emptyList() : input; 376 } 377 378}