001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.executor.jpa; 019 020 import java.sql.Timestamp; 021 import java.text.ParseException; 022 import java.util.ArrayList; 023 import java.util.Collections; 024 import java.util.HashMap; 025 import java.util.Iterator; 026 import java.util.List; 027 import java.util.Map; 028 import java.util.Map.Entry; 029 030 import javax.persistence.EntityManager; 031 import javax.persistence.Query; 032 033 import org.apache.oozie.BundleJobBean; 034 import org.apache.oozie.client.BundleJob; 035 import org.apache.oozie.client.CoordinatorJob; 036 import org.apache.oozie.client.rest.BulkResponseImpl; 037 import org.apache.oozie.BulkResponseInfo; 038 import org.apache.oozie.CoordinatorActionBean; 039 import org.apache.oozie.CoordinatorJobBean; 040 import org.apache.oozie.ErrorCode; 041 import org.apache.oozie.client.CoordinatorAction; 042 import org.apache.oozie.util.DateUtils; 043 import org.apache.oozie.util.ParamChecker; 044 045 /** 046 * The query executor class for bulk monitoring queries i.e. debugging bundle -> 047 * coord actions directly 048 */ 049 public class BulkJPAExecutor implements JPAExecutor<BulkResponseInfo> { 050 private Map<String, List<String>> bulkFilter; 051 // defaults 052 private int start = 1; 053 private int len = 50; 054 055 public BulkJPAExecutor(Map<String, List<String>> bulkFilter, int start, int len) { 056 ParamChecker.notNull(bulkFilter, "bulkFilter"); 057 this.bulkFilter = bulkFilter; 058 this.start = start; 059 this.len = len; 060 } 061 062 /* 063 * (non-Javadoc) 064 * @see org.apache.oozie.executor.jpa.JPAExecutor#getName() 065 */ 066 @Override 067 public String getName() { 068 return "BulkJPAExecutor"; 069 } 070 071 /* 072 * (non-Javadoc) 073 * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager) 074 */ 075 @Override 076 public BulkResponseInfo execute(EntityManager em) throws JPAExecutorException { 077 List<BulkResponseImpl> responseList = new ArrayList<BulkResponseImpl>(); 078 Map<String, Timestamp> actionTimes = new HashMap<String, Timestamp>(); 079 080 try { 081 // Lightweight Query 1 on Bundle level to fetch the bundle job 082 // corresponding to name 083 BundleJobBean bundleBean = bundleQuery(em); 084 085 // Join query between coordinator job and coordinator action tables 086 // to get entries for specific bundleId only 087 String conditions = actionQuery(em, bundleBean, actionTimes, responseList); 088 089 // Query to get the count of records 090 long total = countQuery(conditions, em, bundleBean, actionTimes); 091 092 BulkResponseInfo bulk = new BulkResponseInfo(responseList, start, len, total); 093 return bulk; 094 } 095 catch (Exception e) { 096 throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e); 097 } 098 } 099 100 @SuppressWarnings("unchecked") 101 private BundleJobBean bundleQuery(EntityManager em) throws JPAExecutorException { 102 BundleJobBean bundleBean = new BundleJobBean(); 103 String bundleName = bulkFilter.get(BulkResponseImpl.BULK_FILTER_BUNDLE_NAME).get(0); 104 Query q = em.createNamedQuery("BULK_MONITOR_BUNDLE_QUERY"); 105 q.setParameter("appName", bundleName); 106 List<Object[]> bundles = (List<Object[]>) q.getResultList(); 107 if (bundles.isEmpty()) { 108 throw new JPAExecutorException(ErrorCode.E0603, "No bundle entries found for bundle name: " 109 + bundleName); 110 } 111 if (bundles.size() > 1) { // more than one bundles running with same 112 // name - ERROR. Fail fast 113 throw new JPAExecutorException(ErrorCode.E0603, "Non-unique bundles present for same bundle name: " 114 + bundleName); 115 } 116 bundleBean = getBeanForBundleJob(bundles.get(0), bundleName); 117 return bundleBean; 118 } 119 120 @SuppressWarnings("unchecked") 121 private String actionQuery(EntityManager em, BundleJobBean bundleBean, 122 Map<String, Timestamp> times, List<BulkResponseImpl> responseList) throws ParseException { 123 Query q = em.createNamedQuery("BULK_MONITOR_ACTIONS_QUERY"); 124 StringBuilder getActions = new StringBuilder(q.toString()); 125 StringBuilder conditionClause = new StringBuilder(); 126 conditionClause.append(coordNamesClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD_NAME))); 127 conditionClause.append(statusClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_STATUS))); 128 int offset = getActions.indexOf("ORDER"); 129 getActions.insert(offset - 1, conditionClause); 130 timesClause(getActions, offset, times); 131 q = em.createQuery(getActions.toString()); 132 Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator(); 133 while (iter.hasNext()) { 134 Entry<String, Timestamp> time = iter.next(); 135 q.setParameter(time.getKey(), time.getValue()); 136 } 137 q.setParameter("bundleId", bundleBean.getId()); 138 // pagination 139 q.setFirstResult(start - 1); 140 q.setMaxResults(len); 141 142 List<Object[]> response = q.getResultList(); 143 for (Object[] r : response) { 144 BulkResponseImpl br = getResponseFromObject(bundleBean, r); 145 responseList.add(br); 146 } 147 return q.toString(); 148 } 149 150 private long countQuery(String clause, EntityManager em, BundleJobBean bundleBean, Map<String, Timestamp> times) { 151 Query q = em.createNamedQuery("BULK_MONITOR_COUNT_QUERY"); 152 StringBuilder getTotal = new StringBuilder(q.toString() + " "); 153 getTotal.append(clause.substring(clause.indexOf("WHERE"), clause.indexOf("ORDER"))); 154 q = em.createQuery(getTotal.toString()); 155 q.setParameter("bundleId", bundleBean.getId()); 156 Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator(); 157 while (iter.hasNext()) { 158 Entry<String, Timestamp> time = iter.next(); 159 q.setParameter(time.getKey(), time.getValue()); 160 } 161 long total = ((Long) q.getSingleResult()).longValue(); 162 return total; 163 } 164 165 // Form the where clause to filter by coordinator names 166 private StringBuilder coordNamesClause(List<String> coordNames) { 167 StringBuilder sb = new StringBuilder(); 168 boolean firstVal = true; 169 for (String name : nullToEmpty(coordNames)) { 170 if (firstVal) { 171 sb.append(" AND c.appName IN (\'" + name + "\'"); 172 firstVal = false; 173 } 174 else { 175 sb.append(",\'" + name + "\'"); 176 } 177 } 178 if (!firstVal) { 179 sb.append(") "); 180 } 181 return sb; 182 } 183 184 // Form the where clause to filter by coord action status 185 private StringBuilder statusClause(List<String> statuses) { 186 StringBuilder sb = new StringBuilder(); 187 boolean firstVal = true; 188 for (String status : nullToEmpty(statuses)) { 189 if (firstVal) { 190 sb.append(" AND a.status IN (\'" + status + "\'"); 191 firstVal = false; 192 } 193 else { 194 sb.append(",\'" + status + "\'"); 195 } 196 } 197 if (!firstVal) { 198 sb.append(") "); 199 } 200 else { // statuses was null. adding default 201 sb.append(" AND a.status IN ('KILLED', 'FAILED') "); 202 } 203 return sb; 204 } 205 206 private void timesClause(StringBuilder sb, int offset, Map<String, Timestamp> eachTime) throws ParseException { 207 Timestamp ts = null; 208 List<String> times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_CREATED_EPOCH); 209 if (times != null) { 210 ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()); 211 sb.insert(offset - 1, " AND a.createdTimestamp >= :startCreated"); 212 eachTime.put("startCreated", ts); 213 } 214 times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_CREATED_EPOCH); 215 if (times != null) { 216 ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()); 217 sb.insert(offset - 1, " AND a.createdTimestamp <= :endCreated"); 218 eachTime.put("endCreated", ts); 219 } 220 times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_NOMINAL_EPOCH); 221 if (times != null) { 222 ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()); 223 sb.insert(offset - 1, " AND a.nominalTimestamp >= :startNominal"); 224 eachTime.put("startNominal", ts); 225 } 226 times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_NOMINAL_EPOCH); 227 if (times != null) { 228 ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()); 229 sb.insert(offset - 1, " AND a.nominalTimestamp <= :endNominal"); 230 eachTime.put("endNominal", ts); 231 } 232 } 233 234 private BulkResponseImpl getResponseFromObject(BundleJobBean bundleBean, Object arr[]) { 235 BulkResponseImpl bean = new BulkResponseImpl(); 236 CoordinatorJobBean coordBean = new CoordinatorJobBean(); 237 CoordinatorActionBean actionBean = new CoordinatorActionBean(); 238 if (arr[0] != null) { 239 actionBean.setId((String) arr[0]); 240 } 241 if (arr[1] != null) { 242 actionBean.setActionNumber((Integer) arr[1]); 243 } 244 if (arr[2] != null) { 245 actionBean.setErrorCode((String) arr[2]); 246 } 247 if (arr[3] != null) { 248 actionBean.setErrorMessage((String) arr[3]); 249 } 250 if (arr[4] != null) { 251 actionBean.setExternalId((String) arr[4]); 252 } 253 if (arr[5] != null) { 254 actionBean.setExternalStatus((String) arr[5]); 255 } 256 if (arr[6] != null) { 257 actionBean.setStatus(CoordinatorAction.Status.valueOf((String) arr[6])); 258 } 259 if (arr[7] != null) { 260 actionBean.setCreatedTime(DateUtils.toDate((Timestamp) arr[7])); 261 } 262 if (arr[8] != null) { 263 actionBean.setNominalTime(DateUtils.toDate((Timestamp) arr[8])); 264 } 265 if (arr[9] != null) { 266 actionBean.setMissingDependencies((String) arr[9]); 267 } 268 if (arr[10] != null) { 269 coordBean.setId((String) arr[10]); 270 actionBean.setJobId((String) arr[10]); 271 } 272 if (arr[11] != null) { 273 coordBean.setAppName((String) arr[11]); 274 } 275 if (arr[12] != null) { 276 coordBean.setStatus(CoordinatorJob.Status.valueOf((String) arr[12])); 277 } 278 bean.setBundle(bundleBean); 279 bean.setCoordinator(coordBean); 280 bean.setAction(actionBean); 281 return bean; 282 } 283 284 private BundleJobBean getBeanForBundleJob(Object[] barr, String name) throws JPAExecutorException { 285 BundleJobBean bean = new BundleJobBean(); 286 if (barr[0] != null) { 287 bean.setId((String) barr[0]); 288 } 289 else { 290 throw new JPAExecutorException(ErrorCode.E0603, 291 "bundleId returned by query is null - cannot retrieve bulk results"); 292 } 293 bean.setAppName(name); 294 if (barr[1] != null) { 295 bean.setStatus(BundleJob.Status.valueOf((String) barr[1])); 296 } 297 return bean; 298 } 299 300 // null safeguard 301 public static List<String> nullToEmpty(List<String> input) { 302 return input == null ? Collections.<String> emptyList() : input; 303 } 304 305 }