001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.executor.jpa; 019 020import org.apache.oozie.CoordinatorActionBean; 021import org.apache.oozie.CoordinatorEngine.FILTER_COMPARATORS; 022import org.apache.oozie.ErrorCode; 023import org.apache.oozie.StringBlob; 024import org.apache.oozie.client.CoordinatorAction; 025import org.apache.oozie.client.OozieClient; 026import org.apache.oozie.service.Services; 027import org.apache.oozie.util.DateUtils; 028import org.apache.oozie.util.Pair; 029import org.apache.oozie.util.ParamChecker; 030 031import javax.persistence.EntityManager; 032import javax.persistence.Query; 033import java.sql.Timestamp; 034import java.util.ArrayList; 035import java.util.HashMap; 036import java.util.List; 037import java.util.Map; 038import java.util.Map.Entry; 039 040/** 041 * Load coordinator actions by offset and len (a subset) for a coordinator job. 042 */ 043public class CoordJobGetActionsSubsetJPAExecutor implements JPAExecutor<List<CoordinatorActionBean>> { 044 045 private String coordJobId = null; 046 private int offset = 1; 047 private int len = 50; 048 private boolean desc = false; 049 private Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap; 050 051 public CoordJobGetActionsSubsetJPAExecutor(String coordJobId) { 052 ParamChecker.notNull(coordJobId, "coordJobId"); 053 this.coordJobId = coordJobId; 054 } 055 056 public CoordJobGetActionsSubsetJPAExecutor(String coordJobId, Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap, 057 int offset, int len, boolean desc) { 058 this(coordJobId); 059 this.filterMap = filterMap; 060 this.offset = offset; 061 this.len = len; 062 this.desc = desc; 063 } 064 065 @Override 066 public String getName() { 067 return "CoordJobGetActionsSubsetJPAExecutor"; 068 } 069 070 @Override 071 @SuppressWarnings("unchecked") 072 public List<CoordinatorActionBean> execute(EntityManager em) throws JPAExecutorException { 073 List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>(); 074 try { 075 if (!Services.get().getConf() 076 .getBoolean(CoordActionGetForInfoJPAExecutor.COORD_GET_ALL_COLS_FOR_ACTION, false)) { 077 Query q = em.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME"); 078 q = setQueryParameters(q, em); 079 List<Object[]> actions = q.getResultList(); 080 081 for (Object[] a : actions) { 082 CoordinatorActionBean aa = getBeanForRunningCoordAction(a); 083 actionList.add(aa); 084 } 085 } else { 086 Query q = em.createNamedQuery("GET_ALL_COLS_FOR_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME"); 087 q = setQueryParameters(q, em); 088 actionList = q.getResultList(); 089 } 090 } 091 catch (Exception e) { 092 throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e); 093 } 094 return actionList; 095 } 096 097 private Query setQueryParameters(Query q, EntityManager em){ 098 Map<String, Object> params = null; 099 if (filterMap != null) { 100 // Add the filter clause 101 String query = q.toString(); 102 StringBuilder sbTotal = new StringBuilder(query); 103 int offset = query.lastIndexOf("order"); 104 // Get the 'where' clause for status filters 105 StringBuilder statusClause = new StringBuilder(); 106 params = getWhereClause(statusClause, filterMap); 107 // Insert 'where' before 'order by' 108 sbTotal.insert(offset, statusClause); 109 q = em.createQuery(sbTotal.toString()); 110 } 111 if (desc) { 112 q = em.createQuery(q.toString().concat(" desc")); 113 } 114 if (params != null) { 115 for (String pname : params.keySet()) { 116 q.setParameter(pname, params.get(pname)); 117 } 118 } 119 q.setParameter("jobId", coordJobId); 120 q.setFirstResult(offset - 1); 121 q.setMaxResults(len); 122 return q; 123 } 124 125 // Form the where clause to filter by status values 126 private Map<String, Object> getWhereClause(StringBuilder sb, Map<Pair<String, FILTER_COMPARATORS>, 127 List<Object>> filterMap) { 128 Map<String, Object> params = new HashMap<String, Object>(); 129 int pcnt= 1; 130 for (Entry<Pair<String, FILTER_COMPARATORS>, List<Object>> filter : filterMap.entrySet()) { 131 String field = filter.getKey().getFist(); 132 FILTER_COMPARATORS comp = filter.getKey().getSecond(); 133 String sqlField; 134 if (field.equals(OozieClient.FILTER_STATUS)) { 135 sqlField = "a.statusStr"; 136 } else if (field.equals(OozieClient.FILTER_NOMINAL_TIME)) { 137 sqlField = "a.nominalTimestamp"; 138 } else { 139 throw new IllegalArgumentException("Invalid filter key " + field); 140 } 141 142 sb.append(" and ").append(sqlField).append(" "); 143 switch (comp) { 144 case EQUALS: 145 sb.append("IN ("); 146 params.putAll(appendParams(sb, filter.getValue(), pcnt)); 147 sb.append(")"); 148 break; 149 150 case NOT_EQUALS: 151 sb.append("NOT IN ("); 152 params.putAll(appendParams(sb, filter.getValue(), pcnt)); 153 sb.append(")"); 154 break; 155 156 case GREATER: 157 case GREATER_EQUAL: 158 case LESSTHAN: 159 case LESSTHAN_EQUAL: 160 if (filter.getValue().size() != 1) { 161 throw new IllegalArgumentException(field + comp.getSign() + " can't have more than 1 values"); 162 } 163 164 sb.append(comp.getSign()).append(" "); 165 params.putAll(appendParams(sb, filter.getValue(), pcnt)); 166 break; 167 } 168 169 pcnt += filter.getValue().size(); 170 } 171 sb.append(" "); 172 return params; 173 } 174 175 private Map<String, Object> appendParams(StringBuilder sb, List<Object> value, int sindex) { 176 Map<String, Object> params = new HashMap<String, Object>(); 177 boolean first = true; 178 for (Object val : value) { 179 String pname = "p" + sindex++; 180 params.put(pname, val); 181 if (!first) { 182 sb.append(", "); 183 } 184 sb.append(':').append(pname); 185 first = false; 186 } 187 return params; 188 } 189 190 private CoordinatorActionBean getBeanForRunningCoordAction(Object arr[]) { 191 CoordinatorActionBean bean = new CoordinatorActionBean(); 192 if (arr[0] != null) { 193 bean.setId((String) arr[0]); 194 } 195 if (arr[1] != null) { 196 bean.setActionNumber((Integer) arr[1]); 197 } 198 if (arr[2] != null) { 199 bean.setConsoleUrl((String) arr[2]); 200 } 201 if (arr[3] != null) { 202 bean.setErrorCode((String) arr[3]); 203 } 204 if (arr[4] != null) { 205 bean.setErrorMessage((String) arr[4]); 206 } 207 if (arr[5] != null) { 208 bean.setExternalId((String) arr[5]); 209 } 210 if (arr[6] != null) { 211 bean.setExternalStatus((String) arr[6]); 212 } 213 if (arr[7] != null) { 214 bean.setJobId((String) arr[7]); 215 } 216 if (arr[8] != null) { 217 bean.setTrackerUri((String) arr[8]); 218 } 219 if (arr[9] != null) { 220 bean.setCreatedTime(DateUtils.toDate((Timestamp) arr[9])); 221 } 222 if (arr[10] != null) { 223 bean.setNominalTime(DateUtils.toDate((Timestamp) arr[10])); 224 } 225 if (arr[11] != null) { 226 bean.setStatus(CoordinatorAction.Status.valueOf((String) arr[11])); 227 } 228 if (arr[12] != null) { 229 bean.setLastModifiedTime(DateUtils.toDate((Timestamp) arr[12])); 230 } 231 if (arr[13] != null) { 232 bean.setMissingDependenciesBlob((StringBlob) arr[13]); 233 } 234 if (arr[14] != null) { 235 bean.setPushMissingDependenciesBlob((StringBlob) arr[14]); 236 } 237 if (arr[15] != null) { 238 bean.setTimeOut((Integer) arr[15]); 239 } 240 return bean; 241 242 } 243 244}