001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.workflow.lite;
019    
020    import java.sql.Connection;
021    import java.sql.SQLException;
022    import javax.xml.validation.Schema;
023    
024    import org.apache.oozie.store.OozieSchema.OozieColumn;
025    import org.apache.oozie.store.OozieSchema.OozieTable;
026    import org.apache.oozie.workflow.WorkflowException;
027    import org.apache.oozie.workflow.WorkflowInstance;
028    import org.apache.oozie.util.ParamChecker;
029    import org.apache.oozie.util.WritableUtils;
030    import org.apache.oozie.util.db.SqlStatement.ResultSetReader;
031    import org.apache.oozie.util.db.SqlStatement;
032    import org.apache.oozie.ErrorCode;
033    
034    //TODO javadoc
035    public class DBLiteWorkflowLib extends LiteWorkflowLib {
036        private final Connection connection;
037    
038        public DBLiteWorkflowLib(Schema schema, Class<? extends DecisionNodeHandler> decisionHandlerClass,
039                                 Class<? extends ActionNodeHandler> actionHandlerClass, Connection connection) {
040            super(schema, decisionHandlerClass, actionHandlerClass);
041            this.connection = connection;
042        }
043    
044        /**
045         * Save the Workflow Instance for the given Workflow Application.
046         *
047         * @param instance
048         * @return
049         * @throws WorkflowException
050         */
051        @Override
052        public void insert(WorkflowInstance instance) throws WorkflowException {
053            ParamChecker.notNull(instance, "instance");
054            try {
055                SqlStatement.insertInto(OozieTable.WF_PROCESS_INSTANCE).value(OozieColumn.PI_wfId, instance.getId()).value(
056                        OozieColumn.PI_state, WritableUtils.toByteArray((LiteWorkflowInstance) instance))
057                        .prepareAndSetValues(connection).executeUpdate();
058            }
059            catch (SQLException e) {
060                throw new WorkflowException(ErrorCode.E0713, e.getMessage(), e);
061            }
062        }
063    
064        /**
065         * Loads the Workflow instance with the given ID.
066         *
067         * @param id
068         * @return
069         * @throws WorkflowException
070         */
071        @Override
072        public WorkflowInstance get(String id) throws WorkflowException {
073            ParamChecker.notNull(id, "id");
074            try {
075                ResultSetReader rs = SqlStatement.parse(SqlStatement.selectColumns(OozieColumn.PI_state).where(
076                        SqlStatement.isEqual(OozieColumn.PI_wfId, ParamChecker.notNull(id, "id"))).
077                        prepareAndSetValues(connection).executeQuery());
078                rs.next();
079                LiteWorkflowInstance pInstance = WritableUtils.fromByteArray(rs.getByteArray(OozieColumn.PI_state),
080                                                                             LiteWorkflowInstance.class);
081                return pInstance;
082            }
083            catch (SQLException e) {
084                throw new WorkflowException(ErrorCode.E0713, e.getMessage(), e);
085            }
086        }
087    
088        /**
089         * Updates the Workflow Instance to DB.
090         *
091         * @param instance
092         * @throws WorkflowException
093         */
094        @Override
095        public void update(WorkflowInstance instance) throws WorkflowException {
096            ParamChecker.notNull(instance, "instance");
097            try {
098                SqlStatement.update(OozieTable.WF_PROCESS_INSTANCE).set(OozieColumn.PI_state,
099                                                                        WritableUtils.toByteArray((LiteWorkflowInstance) instance)).where(
100                        SqlStatement.isEqual(OozieColumn.PI_wfId, instance.getId())).
101                        prepareAndSetValues(connection).executeUpdate();
102            }
103            catch (SQLException e) {
104                throw new WorkflowException(ErrorCode.E0713, e.getMessage(), e);
105            }
106        }
107    
108        /**
109         * Delets the Workflow Instance with the given id.
110         *
111         * @param id
112         * @throws WorkflowException
113         */
114        @Override
115        public void delete(String id) throws WorkflowException {
116            ParamChecker.notNull(id, "id");
117            try {
118                SqlStatement.deleteFrom(OozieTable.WF_PROCESS_INSTANCE).where(
119                        SqlStatement.isEqual(OozieColumn.PI_wfId, id)).prepareAndSetValues(connection).executeUpdate();
120            }
121            catch (SQLException e) {
122                throw new WorkflowException(ErrorCode.E0713, e.getMessage(), e);
123            }
124        }
125    
126        @Override
127        public void commit() throws WorkflowException {
128            // NOP
129        }
130    
131        @Override
132        public void close() throws WorkflowException {
133            // NOP
134        }
135    }