001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.executor.jpa;
020
021import java.sql.Timestamp;
022import java.util.ArrayList;
023import java.util.Date;
024import java.util.List;
025
026import javax.persistence.EntityManager;
027import javax.persistence.Query;
028
029import org.apache.oozie.CoordinatorJobBean;
030import org.apache.oozie.ErrorCode;
031import org.apache.oozie.StringBlob;
032import org.apache.oozie.service.JPAService;
033import org.apache.oozie.service.Services;
034import org.apache.oozie.util.DateUtils;
035
036/**
037 * Query Executor that provides API to run query for Coordinator Job
038 */
039
040public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, CoordJobQueryExecutor.CoordJobQuery> {
041
042    public enum CoordJobQuery {
043        UPDATE_COORD_JOB,
044        UPDATE_COORD_JOB_STATUS,
045        UPDATE_COORD_JOB_BUNDLEID,
046        UPDATE_COORD_JOB_APPNAMESPACE,
047        UPDATE_COORD_JOB_STATUS_PENDING,
048        UPDATE_COORD_JOB_BUNDLEID_APPNAMESPACE_PAUSETIME,
049        UPDATE_COORD_JOB_STATUS_MODTIME,
050        UPDATE_COORD_JOB_STATUS_PENDING_MODTIME,
051        UPDATE_COORD_JOB_LAST_MODIFIED_TIME,
052        UPDATE_COORD_JOB_STATUS_PENDING_TIME,
053        UPDATE_COORD_JOB_MATERIALIZE,
054        UPDATE_COORD_JOB_CHANGE,
055        UPDATE_COORD_JOB_CONF,
056        UPDATE_COORD_JOB_XML,
057        GET_COORD_JOB,
058        GET_COORD_JOB_USER_APPNAME,
059        GET_COORD_JOB_INPUT_CHECK,
060        GET_COORD_JOB_ACTION_READY,
061        GET_COORD_JOB_ACTION_KILL,
062        GET_COORD_JOB_MATERIALIZE,
063        GET_COORD_JOB_SUSPEND_KILL,
064        GET_COORD_JOB_STATUS,
065        GET_COORD_JOB_STATUS_PARENTID,
066        GET_COORD_JOBS_CHANGED,
067        GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION,
068        GET_COORD_FOR_ABANDONEDCHECK,
069        GET_COORD_IDS_FOR_STATUS_TRANSIT,
070        GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID,
071        GET_COORD_JOBS_WITH_PARENT_ID,
072        GET_COORD_JOB_CONF,
073        GET_COORD_JOB_XML
074    };
075
076    private static CoordJobQueryExecutor instance = new CoordJobQueryExecutor();
077
078    private CoordJobQueryExecutor() {
079    }
080
081    public static CoordJobQueryExecutor getInstance() {
082        return CoordJobQueryExecutor.instance;
083    }
084
085    @Override
086    public Query getUpdateQuery(CoordJobQuery namedQuery, CoordinatorJobBean cjBean, EntityManager em)
087            throws JPAExecutorException {
088        Query query = em.createNamedQuery(namedQuery.name());
089        switch (namedQuery) {
090            case UPDATE_COORD_JOB:
091                query.setParameter("appName", cjBean.getAppName());
092                query.setParameter("appPath", cjBean.getAppPath());
093                query.setParameter("concurrency", cjBean.getConcurrency());
094                query.setParameter("conf", cjBean.getConfBlob());
095                query.setParameter("externalId", cjBean.getExternalId());
096                query.setParameter("frequency", cjBean.getFrequency());
097                query.setParameter("lastActionNumber", cjBean.getLastActionNumber());
098                query.setParameter("timeOut", cjBean.getTimeout());
099                query.setParameter("timeZone", cjBean.getTimeZone());
100                query.setParameter("createdTime", cjBean.getCreatedTimestamp());
101                query.setParameter("endTime", cjBean.getEndTimestamp());
102                query.setParameter("execution", cjBean.getExecution());
103                query.setParameter("jobXml", cjBean.getJobXmlBlob());
104                query.setParameter("lastAction", cjBean.getLastActionTimestamp());
105                query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp());
106                query.setParameter("nextMaterializedTime", cjBean.getNextMaterializedTimestamp());
107                query.setParameter("origJobXml", cjBean.getOrigJobXmlBlob());
108                query.setParameter("slaXml", cjBean.getSlaXmlBlob());
109                query.setParameter("startTime", cjBean.getStartTimestamp());
110                query.setParameter("status", cjBean.getStatus().toString());
111                query.setParameter("timeUnit", cjBean.getTimeUnitStr());
112                query.setParameter("appNamespace", cjBean.getAppNamespace());
113                query.setParameter("bundleId", cjBean.getBundleId());
114                query.setParameter("matThrottling", cjBean.getMatThrottling());
115                query.setParameter("id", cjBean.getId());
116                break;
117            case UPDATE_COORD_JOB_STATUS:
118                query.setParameter("status", cjBean.getStatus().toString());
119                query.setParameter("id", cjBean.getId());
120                break;
121            case UPDATE_COORD_JOB_BUNDLEID:
122                query.setParameter("bundleId", cjBean.getBundleId());
123                query.setParameter("id", cjBean.getId());
124                break;
125            case UPDATE_COORD_JOB_APPNAMESPACE:
126                query.setParameter("appNamespace", cjBean.getAppNamespace());
127                query.setParameter("id", cjBean.getId());
128                break;
129            case UPDATE_COORD_JOB_STATUS_PENDING:
130                query.setParameter("status", cjBean.getStatus().toString());
131                query.setParameter("pending", cjBean.isPending() ? 1 : 0);
132                query.setParameter("id", cjBean.getId());
133                break;
134            case UPDATE_COORD_JOB_BUNDLEID_APPNAMESPACE_PAUSETIME:
135                query.setParameter("bundleId", cjBean.getBundleId());
136                query.setParameter("appNamespace", cjBean.getAppNamespace());
137                query.setParameter("pauseTime", cjBean.getPauseTimestamp());
138                query.setParameter("id", cjBean.getId());
139                break;
140            case UPDATE_COORD_JOB_STATUS_MODTIME:
141                query.setParameter("status", cjBean.getStatus().toString());
142                query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp());
143                query.setParameter("id", cjBean.getId());
144                break;
145            case UPDATE_COORD_JOB_STATUS_PENDING_MODTIME:
146                query.setParameter("status", cjBean.getStatus().toString());
147                query.setParameter("pending", cjBean.isPending() ? 1 : 0);
148                query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp());
149                query.setParameter("id", cjBean.getId());
150                break;
151            case UPDATE_COORD_JOB_LAST_MODIFIED_TIME:
152                query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp());
153                query.setParameter("id", cjBean.getId());
154                break;
155            case UPDATE_COORD_JOB_STATUS_PENDING_TIME:
156                query.setParameter("status", cjBean.getStatus().toString());
157                query.setParameter("pending", cjBean.isPending() ? 1 : 0);
158                query.setParameter("doneMaterialization", cjBean.isDoneMaterialization() ? 1 : 0);
159                query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp());
160                query.setParameter("suspendedTime", cjBean.getSuspendedTimestamp());
161                query.setParameter("id", cjBean.getId());
162                break;
163            case UPDATE_COORD_JOB_MATERIALIZE:
164                query.setParameter("status", cjBean.getStatus().toString());
165                query.setParameter("pending", cjBean.isPending() ? 1 : 0);
166                query.setParameter("doneMaterialization", cjBean.isDoneMaterialization() ? 1 : 0);
167                query.setParameter("lastActionTime", cjBean.getLastActionTimestamp());
168                query.setParameter("lastActionNumber", cjBean.getLastActionNumber());
169                query.setParameter("nextMatdTime", cjBean.getNextMaterializedTimestamp());
170                query.setParameter("id", cjBean.getId());
171                break;
172            case UPDATE_COORD_JOB_CHANGE:
173                query.setParameter("endTime", cjBean.getEndTimestamp());
174                query.setParameter("status", cjBean.getStatus().toString());
175                query.setParameter("pending", cjBean.isPending() ? 1 : 0);
176                query.setParameter("doneMaterialization", cjBean.isDoneMaterialization() ? 1 : 0);
177                query.setParameter("concurrency", cjBean.getConcurrency());
178                query.setParameter("pauseTime", cjBean.getPauseTimestamp());
179                query.setParameter("lastActionNumber", cjBean.getLastActionNumber());
180                query.setParameter("lastActionTime", cjBean.getLastActionTimestamp());
181                query.setParameter("nextMatdTime", cjBean.getNextMaterializedTimestamp());
182                query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp());
183                query.setParameter("id", cjBean.getId());
184                break;
185            case UPDATE_COORD_JOB_CONF:
186                query.setParameter("conf", cjBean.getConfBlob());
187                query.setParameter("id", cjBean.getId());
188                break;
189            case UPDATE_COORD_JOB_XML:
190                query.setParameter("jobXml", cjBean.getJobXmlBlob());
191                query.setParameter("id", cjBean.getId());
192                break;
193
194            default:
195                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
196                        + namedQuery.name());
197        }
198        return query;
199    }
200
201    @Override
202    public Query getSelectQuery(CoordJobQuery namedQuery, EntityManager em, Object... parameters)
203            throws JPAExecutorException {
204        Query query = em.createNamedQuery(namedQuery.name());
205        switch (namedQuery) {
206            case GET_COORD_JOB:
207            case GET_COORD_JOB_USER_APPNAME:
208            case GET_COORD_JOB_INPUT_CHECK:
209            case GET_COORD_JOB_ACTION_READY:
210            case GET_COORD_JOB_ACTION_KILL:
211            case GET_COORD_JOB_MATERIALIZE:
212            case GET_COORD_JOB_SUSPEND_KILL:
213            case GET_COORD_JOB_STATUS:
214            case GET_COORD_JOB_STATUS_PARENTID:
215            case GET_COORD_JOB_CONF:
216            case GET_COORD_JOB_XML:
217                query.setParameter("id", parameters[0]);
218                break;
219            case GET_COORD_JOBS_CHANGED:
220                query.setParameter("lastModifiedTime", new Timestamp(((Date)parameters[0]).getTime()));
221                break;
222            case GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION:
223                query.setParameter("matTime", new Timestamp(((Date)parameters[0]).getTime()));
224                int limit = (Integer) parameters[1];
225                if (limit > 0) {
226                    query.setMaxResults(limit);
227                }
228                break;
229            case GET_COORD_FOR_ABANDONEDCHECK:
230                query.setParameter(1, (Integer) parameters[0]);
231                query.setParameter(2, (Timestamp) parameters[1]);
232                break;
233
234            case GET_COORD_IDS_FOR_STATUS_TRANSIT:
235                query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime()));
236                break;
237            case GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID:
238                query.setParameter("appName", parameters[0]);
239                query.setParameter("bundleId", parameters[1]);
240                break;
241            case GET_COORD_JOBS_WITH_PARENT_ID:
242                query.setParameter("parentId", parameters[0]);
243                break;
244            default:
245                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
246                        + namedQuery.name());
247        }
248        return query;
249    }
250
251    @Override
252    public int executeUpdate(CoordJobQuery namedQuery, CoordinatorJobBean jobBean) throws JPAExecutorException {
253        JPAService jpaService = Services.get().get(JPAService.class);
254        EntityManager em = jpaService.getEntityManager();
255        Query query = getUpdateQuery(namedQuery, jobBean, em);
256        int ret = jpaService.executeUpdate(namedQuery.name(), query, em);
257        return ret;
258    }
259
260    private CoordinatorJobBean constructBean(CoordJobQuery namedQuery, Object ret, Object... parameters)
261            throws JPAExecutorException {
262        CoordinatorJobBean bean;
263        Object[] arr;
264        switch (namedQuery) {
265            case GET_COORD_JOB:
266                bean = (CoordinatorJobBean) ret;
267                break;
268            case GET_COORD_JOB_USER_APPNAME:
269                bean = new CoordinatorJobBean();
270                arr = (Object[]) ret;
271                bean.setUser((String) arr[0]);
272                bean.setAppName((String) arr[1]);
273                break;
274            case GET_COORD_JOB_INPUT_CHECK:
275                bean = new CoordinatorJobBean();
276                arr = (Object[]) ret;
277                bean.setUser((String) arr[0]);
278                bean.setAppName((String) arr[1]);
279                bean.setStatusStr((String) arr[2]);
280                bean.setAppNamespace((String) arr[3]);
281                bean.setExecution((String) arr[4]);
282                bean.setFrequency((String) arr[5]);
283                bean.setTimeUnitStr((String) arr[6]);
284                bean.setTimeZone((String) arr[7]);
285                bean.setStartTime(DateUtils.toDate((Timestamp) arr[8]));
286                bean.setEndTime(DateUtils.toDate((Timestamp) arr[9]));
287                bean.setJobXmlBlob((StringBlob) arr[10]);
288                break;
289            case GET_COORD_JOB_ACTION_READY:
290                bean = new CoordinatorJobBean();
291                arr = (Object[]) ret;
292                bean.setId((String) arr[0]);
293                bean.setUser((String) arr[1]);
294                bean.setGroup((String) arr[2]);
295                bean.setAppName((String) arr[3]);
296                bean.setStatusStr((String) arr[4]);
297                bean.setExecution((String) arr[5]);
298                bean.setConcurrency((Integer) arr[6]);
299                bean.setFrequency((String) arr[7]);
300                bean.setTimeUnitStr((String) arr[8]);
301                bean.setTimeZone((String) arr[9]);
302                bean.setStartTime(DateUtils.toDate((Timestamp) arr[10]));
303                bean.setEndTime(DateUtils.toDate((Timestamp) arr[11]));
304                bean.setJobXmlBlob((StringBlob) arr[12]);
305                break;
306            case GET_COORD_JOB_ACTION_KILL:
307                bean = new CoordinatorJobBean();
308                arr = (Object[]) ret;
309                bean.setId((String) arr[0]);
310                bean.setUser((String) arr[1]);
311                bean.setGroup((String) arr[2]);
312                bean.setAppName((String) arr[3]);
313                bean.setStatusStr((String) arr[4]);
314                break;
315            case GET_COORD_JOB_MATERIALIZE:
316                bean = new CoordinatorJobBean();
317                arr = (Object[]) ret;
318                bean.setId((String) arr[0]);
319                bean.setUser((String) arr[1]);
320                bean.setGroup((String) arr[2]);
321                bean.setAppName((String) arr[3]);
322                bean.setStatusStr((String) arr[4]);
323                bean.setFrequency((String) arr[5]);
324                bean.setMatThrottling((Integer) arr[6]);
325                bean.setTimeout((Integer) arr[7]);
326                bean.setTimeZone((String) arr[8]);
327                bean.setStartTime(DateUtils.toDate((Timestamp) arr[9]));
328                bean.setEndTime(DateUtils.toDate((Timestamp) arr[10]));
329                bean.setPauseTime(DateUtils.toDate((Timestamp) arr[11]));
330                bean.setNextMaterializedTime(DateUtils.toDate((Timestamp) arr[12]));
331                bean.setLastActionTime(DateUtils.toDate((Timestamp) arr[13]));
332                bean.setLastActionNumber((Integer) arr[14]);
333                bean.setDoneMaterialization((Integer) arr[15]);
334                bean.setBundleId((String) arr[16]);
335                bean.setConfBlob((StringBlob) arr[17]);
336                bean.setJobXmlBlob((StringBlob) arr[18]);
337                bean.setAppNamespace((String) arr[19]);
338                bean.setTimeUnitStr((String) arr[20]);
339                bean.setExecution((String) arr[21]);
340                break;
341            case GET_COORD_JOB_SUSPEND_KILL:
342                bean = new CoordinatorJobBean();
343                arr = (Object[]) ret;
344                bean.setId((String) arr[0]);
345                bean.setUser((String) arr[1]);
346                bean.setGroup((String) arr[2]);
347                bean.setAppName((String) arr[3]);
348                bean.setStatusStr((String) arr[4]);
349                bean.setBundleId((String) arr[5]);
350                bean.setAppNamespace((String) arr[6]);
351                bean.setDoneMaterialization((Integer) arr[7]);
352                break;
353            case GET_COORD_JOB_STATUS:
354                bean = new CoordinatorJobBean();
355                bean.setId((String) parameters[0]);
356                bean.setStatusStr((String) ret);
357                break;
358            case GET_COORD_JOB_STATUS_PARENTID:
359                bean = new CoordinatorJobBean();
360                arr = (Object[]) ret;
361                bean.setId((String) parameters[0]);
362                bean.setStatusStr((String) arr[0]);
363                bean.setBundleId((String) arr[1]);
364                break;
365            case GET_COORD_JOBS_CHANGED:
366                bean = (CoordinatorJobBean) ret;
367                break;
368            case GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION:
369                bean = new CoordinatorJobBean();
370                bean.setId((String) ret);
371                break;
372            case GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID:
373                bean = new CoordinatorJobBean();
374                bean.setId((String) ret);
375                break;
376            case GET_COORD_JOBS_WITH_PARENT_ID:
377                bean = new CoordinatorJobBean();
378                bean.setId((String) ret);
379                break;
380            case GET_COORD_FOR_ABANDONEDCHECK:
381                bean = new CoordinatorJobBean();
382                arr = (Object[]) ret;
383                bean.setId((String) arr[0]);
384                bean.setUser((String) arr[1]);
385                bean.setGroup((String) arr[2]);
386                bean.setAppName((String) arr[3]);
387                break;
388            case GET_COORD_IDS_FOR_STATUS_TRANSIT:
389                bean = new CoordinatorJobBean();
390                bean.setId((String) ret);
391                break;
392            case GET_COORD_JOB_CONF:
393                bean = new CoordinatorJobBean();
394                bean.setConfBlob((StringBlob) ret);
395                break;
396            case GET_COORD_JOB_XML:
397                bean = new CoordinatorJobBean();
398                bean.setJobXmlBlob((StringBlob) ret);
399                break;
400
401            default:
402                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
403                        + namedQuery.name());
404        }
405        return bean;
406    }
407
408    @Override
409    public CoordinatorJobBean get(CoordJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
410        CoordinatorJobBean bean = getIfExist(namedQuery, parameters);
411        if (bean == null) {
412            throw new JPAExecutorException(ErrorCode.E0605, getSelectQuery(namedQuery,
413                    Services.get().get(JPAService.class).getEntityManager(), parameters).toString());
414        }
415        return bean;
416    }
417
418    @Override
419    public List<CoordinatorJobBean> getList(CoordJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
420        JPAService jpaService = Services.get().get(JPAService.class);
421        EntityManager em = jpaService.getEntityManager();
422        Query query = getSelectQuery(namedQuery, em, parameters);
423        List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em);
424        List<CoordinatorJobBean> beanList = new ArrayList<CoordinatorJobBean>();
425        if (retList != null) {
426            for (Object ret : retList) {
427                beanList.add(constructBean(namedQuery, ret, parameters));
428            }
429        }
430        return beanList;
431    }
432
433    @Override
434    public Object getSingleValue(CoordJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
435        throw new UnsupportedOperationException();
436    }
437
438    @Override
439    public CoordinatorJobBean getIfExist(CoordJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
440        JPAService jpaService = Services.get().get(JPAService.class);
441        EntityManager em = jpaService.getEntityManager();
442        Query query = getSelectQuery(namedQuery, em, parameters);
443        Object ret = jpaService.executeGet(namedQuery.name(), query, em);
444        if (ret == null) {
445            return null;
446        }
447        CoordinatorJobBean bean = constructBean(namedQuery, ret, parameters);
448        return bean;
449    }
450}