001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.executor.jpa;
019
020import java.sql.Timestamp;
021import java.util.ArrayList;
022import java.util.Date;
023import java.util.List;
024
025import javax.persistence.EntityManager;
026import javax.persistence.Query;
027
028import org.apache.oozie.CoordinatorJobBean;
029import org.apache.oozie.ErrorCode;
030import org.apache.oozie.StringBlob;
031import org.apache.oozie.service.JPAService;
032import org.apache.oozie.service.Services;
033import org.apache.oozie.util.DateUtils;
034
035/**
036 * Query Executor that provides API to run query for Coordinator Job
037 */
038
039public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, CoordJobQueryExecutor.CoordJobQuery> {
040
041    public enum CoordJobQuery {
042        UPDATE_COORD_JOB,
043        UPDATE_COORD_JOB_STATUS,
044        UPDATE_COORD_JOB_BUNDLEID,
045        UPDATE_COORD_JOB_APPNAMESPACE,
046        UPDATE_COORD_JOB_STATUS_PENDING,
047        UPDATE_COORD_JOB_BUNDLEID_APPNAMESPACE_PAUSETIME,
048        UPDATE_COORD_JOB_STATUS_MODTIME,
049        UPDATE_COORD_JOB_STATUS_PENDING_MODTIME,
050        UPDATE_COORD_JOB_LAST_MODIFIED_TIME,
051        UPDATE_COORD_JOB_STATUS_PENDING_TIME,
052        UPDATE_COORD_JOB_MATERIALIZE,
053        UPDATE_COORD_JOB_CHANGE,
054        GET_COORD_JOB,
055        GET_COORD_JOB_USER_APPNAME,
056        GET_COORD_JOB_INPUT_CHECK,
057        GET_COORD_JOB_ACTION_READY,
058        GET_COORD_JOB_ACTION_KILL,
059        GET_COORD_JOB_MATERIALIZE,
060        GET_COORD_JOB_SUSPEND_KILL,
061        GET_COORD_JOB_STATUS_PARENTID,
062        GET_COORD_JOBS_CHANGED,
063        GET_COORD_JOBS_OLDER_FOR_MATERILZATION
064    };
065
066    private static CoordJobQueryExecutor instance = new CoordJobQueryExecutor();
067
068    private CoordJobQueryExecutor() {
069    }
070
071    public static CoordJobQueryExecutor getInstance() {
072        return CoordJobQueryExecutor.instance;
073    }
074
075    @Override
076    public Query getUpdateQuery(CoordJobQuery namedQuery, CoordinatorJobBean cjBean, EntityManager em)
077            throws JPAExecutorException {
078        Query query = em.createNamedQuery(namedQuery.name());
079        switch (namedQuery) {
080            case UPDATE_COORD_JOB:
081                query.setParameter("appName", cjBean.getAppName());
082                query.setParameter("appPath", cjBean.getAppPath());
083                query.setParameter("concurrency", cjBean.getConcurrency());
084                query.setParameter("conf", cjBean.getConfBlob());
085                query.setParameter("externalId", cjBean.getExternalId());
086                query.setParameter("frequency", cjBean.getFrequency());
087                query.setParameter("lastActionNumber", cjBean.getLastActionNumber());
088                query.setParameter("timeOut", cjBean.getTimeout());
089                query.setParameter("timeZone", cjBean.getTimeZone());
090                query.setParameter("createdTime", cjBean.getCreatedTimestamp());
091                query.setParameter("endTime", cjBean.getEndTimestamp());
092                query.setParameter("execution", cjBean.getExecution());
093                query.setParameter("jobXml", cjBean.getJobXmlBlob());
094                query.setParameter("lastAction", cjBean.getLastActionTimestamp());
095                query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp());
096                query.setParameter("nextMaterializedTime", cjBean.getNextMaterializedTimestamp());
097                query.setParameter("origJobXml", cjBean.getOrigJobXmlBlob());
098                query.setParameter("slaXml", cjBean.getSlaXmlBlob());
099                query.setParameter("startTime", cjBean.getStartTimestamp());
100                query.setParameter("status", cjBean.getStatus().toString());
101                query.setParameter("timeUnit", cjBean.getTimeUnitStr());
102                query.setParameter("appNamespace", cjBean.getAppNamespace());
103                query.setParameter("bundleId", cjBean.getBundleId());
104                query.setParameter("matThrottling", cjBean.getMatThrottling());
105                query.setParameter("id", cjBean.getId());
106                break;
107            case UPDATE_COORD_JOB_STATUS:
108                query.setParameter("status", cjBean.getStatus().toString());
109                query.setParameter("id", cjBean.getId());
110                break;
111            case UPDATE_COORD_JOB_BUNDLEID:
112                query.setParameter("bundleId", cjBean.getBundleId());
113                query.setParameter("id", cjBean.getId());
114                break;
115            case UPDATE_COORD_JOB_APPNAMESPACE:
116                query.setParameter("appNamespace", cjBean.getAppNamespace());
117                query.setParameter("id", cjBean.getId());
118                break;
119            case UPDATE_COORD_JOB_STATUS_PENDING:
120                query.setParameter("status", cjBean.getStatus().toString());
121                query.setParameter("pending", cjBean.isPending() ? 1 : 0);
122                query.setParameter("id", cjBean.getId());
123                break;
124            case UPDATE_COORD_JOB_BUNDLEID_APPNAMESPACE_PAUSETIME:
125                query.setParameter("bundleId", cjBean.getBundleId());
126                query.setParameter("appNamespace", cjBean.getAppNamespace());
127                query.setParameter("pauseTime", cjBean.getPauseTimestamp());
128                query.setParameter("id", cjBean.getId());
129                break;
130            case UPDATE_COORD_JOB_STATUS_MODTIME:
131                query.setParameter("status", cjBean.getStatus().toString());
132                query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp());
133                query.setParameter("id", cjBean.getId());
134                break;
135            case UPDATE_COORD_JOB_STATUS_PENDING_MODTIME:
136                query.setParameter("status", cjBean.getStatus().toString());
137                query.setParameter("pending", cjBean.isPending() ? 1 : 0);
138                query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp());
139                query.setParameter("id", cjBean.getId());
140                break;
141            case UPDATE_COORD_JOB_LAST_MODIFIED_TIME:
142                query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp());
143                query.setParameter("id", cjBean.getId());
144                break;
145            case UPDATE_COORD_JOB_STATUS_PENDING_TIME:
146                query.setParameter("status", cjBean.getStatus().toString());
147                query.setParameter("pending", cjBean.isPending() ? 1 : 0);
148                query.setParameter("doneMaterialization", cjBean.isDoneMaterialization() ? 1 : 0);
149                query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp());
150                query.setParameter("suspendedTime", cjBean.getSuspendedTimestamp());
151                query.setParameter("id", cjBean.getId());
152                break;
153            case UPDATE_COORD_JOB_MATERIALIZE:
154                query.setParameter("status", cjBean.getStatus().toString());
155                query.setParameter("pending", cjBean.isPending() ? 1 : 0);
156                query.setParameter("doneMaterialization", cjBean.isDoneMaterialization() ? 1 : 0);
157                query.setParameter("lastActionTime", cjBean.getLastActionTimestamp());
158                query.setParameter("lastActionNumber", cjBean.getLastActionNumber());
159                query.setParameter("nextMatdTime", cjBean.getNextMaterializedTimestamp());
160                query.setParameter("id", cjBean.getId());
161                break;
162            case UPDATE_COORD_JOB_CHANGE:
163                query.setParameter("endTime", cjBean.getEndTimestamp());
164                query.setParameter("status", cjBean.getStatus().toString());
165                query.setParameter("pending", cjBean.isPending() ? 1 : 0);
166                query.setParameter("doneMaterialization", cjBean.isDoneMaterialization() ? 1 : 0);
167                query.setParameter("concurrency", cjBean.getConcurrency());
168                query.setParameter("pauseTime", cjBean.getPauseTimestamp());
169                query.setParameter("lastActionNumber", cjBean.getLastActionNumber());
170                query.setParameter("lastActionTime", cjBean.getLastActionTimestamp());
171                query.setParameter("nextMatdTime", cjBean.getNextMaterializedTimestamp());
172                query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp());
173                query.setParameter("id", cjBean.getId());
174                break;
175            default:
176                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
177                        + namedQuery.name());
178        }
179        return query;
180    }
181
182    @Override
183    public Query getSelectQuery(CoordJobQuery namedQuery, EntityManager em, Object... parameters)
184            throws JPAExecutorException {
185        Query query = em.createNamedQuery(namedQuery.name());
186        switch (namedQuery) {
187            case GET_COORD_JOB:
188            case GET_COORD_JOB_USER_APPNAME:
189            case GET_COORD_JOB_INPUT_CHECK:
190            case GET_COORD_JOB_ACTION_READY:
191            case GET_COORD_JOB_ACTION_KILL:
192            case GET_COORD_JOB_MATERIALIZE:
193            case GET_COORD_JOB_SUSPEND_KILL:
194            case GET_COORD_JOB_STATUS_PARENTID:
195                query.setParameter("id", parameters[0]);
196                break;
197            case GET_COORD_JOBS_CHANGED:
198                query.setParameter("lastModifiedTime", new Timestamp(((Date)parameters[0]).getTime()));
199                break;
200            case GET_COORD_JOBS_OLDER_FOR_MATERILZATION:
201                query.setParameter("matTime", new Timestamp(((Date)parameters[0]).getTime()));
202                int limit = (Integer) parameters[1];
203                if (limit > 0) {
204                    query.setMaxResults(limit);
205                }
206                break;
207
208            default:
209                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
210                        + namedQuery.name());
211        }
212        return query;
213    }
214
215    @Override
216    public int executeUpdate(CoordJobQuery namedQuery, CoordinatorJobBean jobBean) throws JPAExecutorException {
217        JPAService jpaService = Services.get().get(JPAService.class);
218        EntityManager em = jpaService.getEntityManager();
219        Query query = getUpdateQuery(namedQuery, jobBean, em);
220        int ret = jpaService.executeUpdate(namedQuery.name(), query, em);
221        return ret;
222    }
223
224    private CoordinatorJobBean constructBean(CoordJobQuery namedQuery, Object ret, Object... parameters)
225            throws JPAExecutorException {
226        CoordinatorJobBean bean;
227        Object[] arr;
228        switch (namedQuery) {
229            case GET_COORD_JOB:
230                bean = (CoordinatorJobBean) ret;
231                break;
232            case GET_COORD_JOB_USER_APPNAME:
233                bean = new CoordinatorJobBean();
234                arr = (Object[]) ret;
235                bean.setUser((String) arr[0]);
236                bean.setAppName((String) arr[1]);
237                break;
238            case GET_COORD_JOB_INPUT_CHECK:
239                bean = new CoordinatorJobBean();
240                arr = (Object[]) ret;
241                bean.setUser((String) arr[0]);
242                bean.setAppName((String) arr[1]);
243                bean.setStatusStr((String) arr[2]);
244                bean.setAppNamespace((String) arr[3]);
245                bean.setExecution((String) arr[4]);
246                bean.setFrequency((String) arr[5]);
247                bean.setTimeUnitStr((String) arr[6]);
248                bean.setTimeZone((String) arr[7]);
249                bean.setEndTime(DateUtils.toDate((Timestamp) arr[8]));
250                break;
251            case GET_COORD_JOB_ACTION_READY:
252                bean = new CoordinatorJobBean();
253                arr = (Object[]) ret;
254                bean.setId((String) arr[0]);
255                bean.setUser((String) arr[1]);
256                bean.setGroup((String) arr[2]);
257                bean.setAppName((String) arr[3]);
258                bean.setStatusStr((String) arr[4]);
259                bean.setExecution((String) arr[5]);
260                bean.setConcurrency((Integer) arr[6]);
261                break;
262            case GET_COORD_JOB_ACTION_KILL:
263                bean = new CoordinatorJobBean();
264                arr = (Object[]) ret;
265                bean.setId((String) arr[0]);
266                bean.setUser((String) arr[1]);
267                bean.setGroup((String) arr[2]);
268                bean.setAppName((String) arr[3]);
269                bean.setStatusStr((String) arr[4]);
270                break;
271            case GET_COORD_JOB_MATERIALIZE:
272                bean = new CoordinatorJobBean();
273                arr = (Object[]) ret;
274                bean.setId((String) arr[0]);
275                bean.setUser((String) arr[1]);
276                bean.setGroup((String) arr[2]);
277                bean.setAppName((String) arr[3]);
278                bean.setStatusStr((String) arr[4]);
279                bean.setFrequency((String) arr[5]);
280                bean.setMatThrottling((Integer) arr[6]);
281                bean.setTimeout((Integer) arr[7]);
282                bean.setTimeZone((String) arr[8]);
283                bean.setStartTime(DateUtils.toDate((Timestamp) arr[9]));
284                bean.setEndTime(DateUtils.toDate((Timestamp) arr[10]));
285                bean.setPauseTime(DateUtils.toDate((Timestamp) arr[11]));
286                bean.setNextMaterializedTime(DateUtils.toDate((Timestamp) arr[12]));
287                bean.setLastActionTime(DateUtils.toDate((Timestamp) arr[13]));
288                bean.setLastActionNumber((Integer) arr[14]);
289                bean.setDoneMaterialization((Integer) arr[15]);
290                bean.setBundleId((String) arr[16]);
291                bean.setConfBlob((StringBlob) arr[17]);
292                bean.setJobXmlBlob((StringBlob) arr[18]);
293                bean.setAppNamespace((String) arr[19]);
294                bean.setTimeUnitStr((String) arr[20]);
295                bean.setExecution((String) arr[21]);
296                break;
297            case GET_COORD_JOB_SUSPEND_KILL:
298                bean = new CoordinatorJobBean();
299                arr = (Object[]) ret;
300                bean.setId((String) arr[0]);
301                bean.setUser((String) arr[1]);
302                bean.setGroup((String) arr[2]);
303                bean.setAppName((String) arr[3]);
304                bean.setStatusStr((String) arr[4]);
305                bean.setBundleId((String) arr[5]);
306                bean.setAppNamespace((String) arr[6]);
307                bean.setDoneMaterialization((Integer) arr[7]);
308                break;
309            case GET_COORD_JOB_STATUS_PARENTID:
310                bean = new CoordinatorJobBean();
311                arr = (Object[]) ret;
312                bean.setId((String) parameters[0]);
313                bean.setStatusStr((String) arr[0]);
314                bean.setBundleId((String) arr[1]);
315                break;
316            case GET_COORD_JOBS_CHANGED:
317                bean = (CoordinatorJobBean) ret;
318                break;
319            case GET_COORD_JOBS_OLDER_FOR_MATERILZATION:
320                bean = new CoordinatorJobBean();
321                bean.setId((String) ret);
322                break;
323            default:
324                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
325                        + namedQuery.name());
326        }
327        return bean;
328    }
329
330    @Override
331    public CoordinatorJobBean get(CoordJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
332        JPAService jpaService = Services.get().get(JPAService.class);
333        EntityManager em = jpaService.getEntityManager();
334        Query query = getSelectQuery(namedQuery, em, parameters);
335        Object ret = jpaService.executeGet(namedQuery.name(), query, em);
336        if (ret == null) {
337            throw new JPAExecutorException(ErrorCode.E0604, query.toString());
338        }
339        CoordinatorJobBean bean = constructBean(namedQuery, ret, parameters);
340        return bean;
341    }
342
343    @Override
344    public List<CoordinatorJobBean> getList(CoordJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
345        JPAService jpaService = Services.get().get(JPAService.class);
346        EntityManager em = jpaService.getEntityManager();
347        Query query = getSelectQuery(namedQuery, em, parameters);
348        List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em);
349        List<CoordinatorJobBean> beanList = new ArrayList<CoordinatorJobBean>();
350        if (retList != null) {
351            for (Object ret : retList) {
352                beanList.add(constructBean(namedQuery, ret, parameters));
353            }
354        }
355        return beanList;
356    }
357
358    @Override
359    public Object getSingleValue(CoordJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
360        throw new UnsupportedOperationException();
361    }
362
363}