001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.executor.jpa;
020
021import org.apache.oozie.coord.CoordUtils;
022import org.apache.oozie.CoordinatorActionBean;
023import org.apache.oozie.CoordinatorEngine.FILTER_COMPARATORS;
024import org.apache.oozie.ErrorCode;
025import org.apache.oozie.StringBlob;
026import org.apache.oozie.client.CoordinatorAction;
027import org.apache.oozie.service.Services;
028import org.apache.oozie.util.DateUtils;
029import org.apache.oozie.util.Pair;
030import org.apache.oozie.util.ParamChecker;
031
032import javax.persistence.EntityManager;
033import javax.persistence.Query;
034import java.sql.Timestamp;
035import java.util.ArrayList;
036import java.util.List;
037import java.util.Map;
038
039/**
040 * Load coordinator actions by offset and len (a subset) for a coordinator job.
041 */
042public class CoordJobGetActionsSubsetJPAExecutor implements JPAExecutor<List<CoordinatorActionBean>> {
043
044    private String coordJobId = null;
045    private int offset = 1;
046    private int len = 50;
047    private boolean desc = false;
048    private Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap;
049
050    public CoordJobGetActionsSubsetJPAExecutor(String coordJobId) {
051        ParamChecker.notNull(coordJobId, "coordJobId");
052        this.coordJobId = coordJobId;
053    }
054
055    public CoordJobGetActionsSubsetJPAExecutor(String coordJobId, Map<Pair<String, FILTER_COMPARATORS>,
056            List<Object>> filterMap, int offset, int len, boolean desc) {
057        this(coordJobId);
058        this.filterMap = filterMap;
059        this.offset = offset;
060        this.len = len;
061        this.desc = desc;
062    }
063
064    @Override
065    public String getName() {
066        return "CoordJobGetActionsSubsetJPAExecutor";
067    }
068
069    @Override
070    @SuppressWarnings("unchecked")
071    public List<CoordinatorActionBean> execute(EntityManager em) throws JPAExecutorException {
072        List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
073        try {
074            if (!Services.get().getConf()
075                    .getBoolean(CoordActionGetForInfoJPAExecutor.COORD_GET_ALL_COLS_FOR_ACTION, false)) {
076                Query q = em.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME");
077                q = setQueryParameters(q, em);
078                List<Object[]> actions = q.getResultList();
079
080                for (Object[] a : actions) {
081                    CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
082                    actionList.add(aa);
083                }
084            } else {
085                Query q = em.createNamedQuery("GET_ALL_COLS_FOR_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME");
086                q = setQueryParameters(q, em);
087                actionList = q.getResultList();
088            }
089        }
090        catch (Exception e) {
091            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
092        }
093        return actionList;
094    }
095
096    private Query setQueryParameters(Query q, EntityManager em){
097        Map<String, Object> params = null;
098        if (filterMap != null) {
099            // Add the filter clause
100            String query = q.toString();
101            StringBuilder sbTotal = new StringBuilder(query);
102            int offset = query.lastIndexOf("order");
103            // Get the 'where' clause for status filters
104            StringBuilder statusClause = new StringBuilder();
105            params = CoordUtils.getWhereClause(statusClause, filterMap);
106            // Insert 'where' before 'order by'
107            sbTotal.insert(offset, statusClause);
108            q = em.createQuery(sbTotal.toString());
109        }
110        if (desc) {
111            q = em.createQuery(q.toString().concat(" desc"));
112        }
113        if (params != null) {
114            for (String pname : params.keySet()) {
115                q.setParameter(pname, params.get(pname));
116            }
117        }
118        q.setParameter("jobId", coordJobId);
119        q.setFirstResult(offset - 1);
120        q.setMaxResults(len);
121        return q;
122    }
123
124    private CoordinatorActionBean getBeanForRunningCoordAction(Object arr[]) {
125        CoordinatorActionBean bean = new CoordinatorActionBean();
126        if (arr[0] != null) {
127            bean.setId((String) arr[0]);
128        }
129        if (arr[1] != null) {
130            bean.setActionNumber((Integer) arr[1]);
131        }
132        if (arr[2] != null) {
133            bean.setConsoleUrl((String) arr[2]);
134        }
135        if (arr[3] != null) {
136            bean.setErrorCode((String) arr[3]);
137        }
138        if (arr[4] != null) {
139            bean.setErrorMessage((String) arr[4]);
140        }
141        if (arr[5] != null) {
142            bean.setExternalId((String) arr[5]);
143        }
144        if (arr[6] != null) {
145            bean.setExternalStatus((String) arr[6]);
146        }
147        if (arr[7] != null) {
148            bean.setJobId((String) arr[7]);
149        }
150        if (arr[8] != null) {
151            bean.setTrackerUri((String) arr[8]);
152        }
153        if (arr[9] != null) {
154            bean.setCreatedTime(DateUtils.toDate((Timestamp) arr[9]));
155        }
156        if (arr[10] != null) {
157            bean.setNominalTime(DateUtils.toDate((Timestamp) arr[10]));
158        }
159        if (arr[11] != null) {
160            bean.setStatus(CoordinatorAction.Status.valueOf((String) arr[11]));
161        }
162        if (arr[12] != null) {
163            bean.setLastModifiedTime(DateUtils.toDate((Timestamp) arr[12]));
164        }
165        if (arr[13] != null) {
166            bean.setMissingDependenciesBlob((StringBlob) arr[13]);
167        }
168        if (arr[14] != null) {
169            bean.setPushMissingDependenciesBlob((StringBlob) arr[14]);
170        }
171        if (arr[15] != null) {
172            bean.setTimeOut((Integer) arr[15]);
173        }
174        return bean;
175
176    }
177
178}