001/* 
002    Licensed to the Apache Software Foundation (ASF) under one
003    or more contributor license agreements.  See the NOTICE file
004    distributed with this work for additional information
005    regarding copyright ownership.  The ASF licenses this file
006    to you under the Apache License, Version 2.0 (the
007    "License"); you may not use this file except in compliance
008    with the License.  You may obtain a copy of the License at
009
010       http://www.apache.org/licenses/LICENSE-2.0
011
012    Unless required by applicable law or agreed to in writing,
013    software distributed under the License is distributed on an
014    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015    KIND, either express or implied.  See the License for the
016    specific language governing permissions and limitations
017    under the License.  
018 */
019package org.apache.wiki.workflow;
020
021import org.apache.commons.lang3.time.StopWatch;
022import org.apache.log4j.Logger;
023import org.apache.wiki.api.core.Engine;
024import org.apache.wiki.api.core.Session;
025import org.apache.wiki.api.exceptions.WikiException;
026import org.apache.wiki.auth.AuthorizationManager;
027import org.apache.wiki.auth.acl.UnresolvedPrincipal;
028import org.apache.wiki.event.WikiEvent;
029import org.apache.wiki.event.WikiEventEmitter;
030import org.apache.wiki.event.WorkflowEvent;
031
032import java.io.BufferedInputStream;
033import java.io.BufferedOutputStream;
034import java.io.File;
035import java.io.FileInputStream;
036import java.io.FileOutputStream;
037import java.io.IOException;
038import java.io.ObjectInputStream;
039import java.io.ObjectOutputStream;
040import java.security.Principal;
041import java.util.ArrayList;
042import java.util.List;
043import java.util.Map;
044import java.util.Optional;
045import java.util.Properties;
046import java.util.Set;
047import java.util.concurrent.ConcurrentHashMap;
048import java.util.concurrent.CopyOnWriteArrayList;
049
050
051/**
052 * <p>
053 * Monitor class that tracks running Workflows. The WorkflowManager also keeps track of the names of
054 * users or groups expected to approve particular Workflows.
055 * </p>
056 */
057public class DefaultWorkflowManager implements WorkflowManager {
058
059    private static final Logger LOG = Logger.getLogger( DefaultWorkflowManager.class );
060    static final String SERIALIZATION_FILE = "wkflmgr.ser";
061
062    /** We use this also a generic serialization id */
063    private static final long serialVersionUID = 6L;
064
065    DecisionQueue m_queue = new DecisionQueue();
066    Set< Workflow > m_workflows;
067    final Map< String, Principal > m_approvers;
068    List< Workflow > m_completed;
069    private Engine m_engine = null;
070
071    /**
072     * Constructs a new WorkflowManager, with an empty workflow cache.
073     */
074    public DefaultWorkflowManager() {
075        m_workflows = ConcurrentHashMap.newKeySet();
076        m_approvers = new ConcurrentHashMap<>();
077        m_completed = new CopyOnWriteArrayList<>();
078        WikiEventEmitter.attach( this );
079    }
080
081    /**
082     * {@inheritDoc}
083     */
084    @Override
085    public Set< Workflow > getWorkflows() {
086        final Set< Workflow > workflows = ConcurrentHashMap.newKeySet();
087        workflows.addAll( m_workflows );
088        return workflows;
089    }
090
091    /**
092     * {@inheritDoc}
093     */
094    @Override
095    public List< Workflow > getCompletedWorkflows() {
096        return new CopyOnWriteArrayList< >( m_completed );
097    }
098
099    /**
100     * {@inheritDoc}
101     *
102     * Any properties that begin with {@link #PROPERTY_APPROVER_PREFIX} will be assumed to be Decisions that require approval. For a given
103     * property key, everything after the prefix denotes the Decision's message key. The property value indicates the Principal (Role,
104     * GroupPrincipal, WikiPrincipal) that must approve the Decision. For example, if the property key/value pair is
105     * {@code jspwiki.approver.workflow.saveWikiPage=Admin}, the Decision's message key is <code>workflow.saveWikiPage</code>. The Principal
106     * <code>Admin</code> will be resolved via {@link org.apache.wiki.auth.AuthorizationManager#resolvePrincipal(String)}.
107     */
108    @Override
109    public void initialize( final Engine engine, final Properties props ) {
110        m_engine = engine;
111
112        // Identify the workflows requiring approvals
113        for( final Object o : props.keySet() ) {
114            final String prop = ( String )o;
115            if( prop.startsWith( PROPERTY_APPROVER_PREFIX ) ) {
116                // For the key, everything after the prefix is the workflow name
117                final String key = prop.substring( PROPERTY_APPROVER_PREFIX.length() );
118                if( key.length() > 0 ) {
119                    // Only use non-null/non-blank approvers
120                    final String approver = props.getProperty( prop );
121                    if( approver != null && approver.length() > 0 ) {
122                        m_approvers.put( key, new UnresolvedPrincipal( approver ) );
123                    }
124                }
125            }
126        }
127
128        unserializeFromDisk( new File( m_engine.getWorkDir(), SERIALIZATION_FILE ) );
129    }
130
131    /**
132     *  Reads the serialized data from the disk back to memory.
133     *
134     * @return the date when the data was last written on disk or {@code 0} if there has been problems reading from disk.
135     */
136    @SuppressWarnings( "unchecked" )
137    synchronized long unserializeFromDisk( final File f ) {
138        long saved = 0L;
139        final StopWatch sw = new StopWatch();
140        sw.start();
141        try( final ObjectInputStream in = new ObjectInputStream( new BufferedInputStream( new FileInputStream( f ) ) ) ) {
142            final long ver = in.readLong();
143            if( ver != serialVersionUID ) {
144                LOG.warn( "File format has changed; Unable to recover workflows and decision queue from disk." );
145            } else {
146                saved        = in.readLong();
147                m_workflows  = ( Set< Workflow > )in.readObject();
148                m_queue      = ( DecisionQueue )in.readObject();
149                m_completed  = ( List< Workflow > )in.readObject();
150                LOG.debug( "Read serialized data successfully in " + sw );
151            }
152        } catch( final IOException | ClassNotFoundException e ) {
153            LOG.error( "unable to recover from disk workflows and decision queue: " + e.getMessage(), e );
154        }
155        sw.stop();
156
157        return saved;
158    }
159
160    /**
161     *  Serializes workflows and decisionqueue to disk.  The format is private, don't touch it.
162     */
163    synchronized void serializeToDisk( final File f ) {
164        try( final ObjectOutputStream out = new ObjectOutputStream( new BufferedOutputStream( new FileOutputStream( f ) ) ) ) {
165            final StopWatch sw = new StopWatch();
166            sw.start();
167
168            out.writeLong( serialVersionUID );
169            out.writeLong( System.currentTimeMillis() ); // Timestamp
170            out.writeObject( m_workflows );
171            out.writeObject( m_queue );
172            out.writeObject( m_completed );
173
174            sw.stop();
175
176            LOG.debug( "serialization done - took " + sw );
177        } catch( final IOException ioe ) {
178            LOG.error( "Unable to serialize!", ioe );
179        }
180    }
181
182    /**
183     * {@inheritDoc}
184     */
185    @Override
186    public boolean requiresApproval( final String messageKey ) {
187        return  m_approvers.containsKey( messageKey );
188    }
189
190    /**
191     * {@inheritDoc}
192     */
193    @Override
194    public Principal getApprover( final String messageKey ) throws WikiException {
195        Principal approver = m_approvers.get( messageKey );
196        if ( approver == null ) {
197            throw new WikiException( "Workflow '" + messageKey + "' does not require approval." );
198        }
199
200        // Try to resolve UnresolvedPrincipals
201        if ( approver instanceof UnresolvedPrincipal ) {
202            final String name = approver.getName();
203            approver = m_engine.getManager( AuthorizationManager.class ).resolvePrincipal( name );
204
205            // If still unresolved, throw exception; otherwise, freshen our cache
206            if ( approver instanceof UnresolvedPrincipal ) {
207                throw new WikiException( "Workflow approver '" + name + "' cannot not be resolved." );
208            }
209
210            m_approvers.put( messageKey, approver );
211        }
212        return approver;
213    }
214
215    /**
216     * Protected helper method that returns the associated Engine
217     *
218     * @return the wiki engine
219     */
220    protected Engine getEngine() {
221        if ( m_engine == null ) {
222            throw new IllegalStateException( "Engine cannot be null; please initialize WorkflowManager first." );
223        }
224        return m_engine;
225    }
226
227    /**
228     * Returns the DecisionQueue associated with this WorkflowManager
229     *
230     * @return the decision queue
231     */
232    @Override
233    public DecisionQueue getDecisionQueue() {
234        return m_queue;
235    }
236
237    /**
238     * {@inheritDoc}
239     */
240    @Override
241    public List< Workflow > getOwnerWorkflows( final Session session ) {
242        final List< Workflow > workflows = new ArrayList<>();
243        if ( session.isAuthenticated() ) {
244            final Principal[] sessionPrincipals = session.getPrincipals();
245            for( final Workflow w : m_workflows ) {
246                final Principal owner = w.getOwner();
247                for ( final Principal sessionPrincipal : sessionPrincipals ) {
248                    if ( sessionPrincipal.equals( owner ) ) {
249                        workflows.add( w );
250                        break;
251                    }
252                }
253            }
254        }
255        return workflows;
256    }
257
258    /**
259     * Listens for {@link WorkflowEvent} objects emitted by Workflows. In particular, this method listens for {@link WorkflowEvent#CREATED},
260     * {@link WorkflowEvent#ABORTED}, {@link WorkflowEvent#COMPLETED} and {@link WorkflowEvent#DQ_REMOVAL} events. If a workflow is created,
261     * it is automatically added to the cache. If one is aborted or completed, it is automatically removed. If a removal from decision queue
262     * is issued, the current step from workflow, which is assumed to be a {@link Decision}, is removed from the {@link DecisionQueue}.
263     * 
264     * @param event the event passed to this listener
265     */
266    @Override
267    public void actionPerformed( final WikiEvent event ) {
268        if( event instanceof WorkflowEvent ) {
269            if( event.getSrc() instanceof Workflow ) {
270                final Workflow workflow = event.getSrc();
271                switch( event.getType() ) {
272                // Remove from manager
273                case WorkflowEvent.ABORTED   :
274                case WorkflowEvent.COMPLETED : remove( workflow ); break;
275                // Add to manager
276                case WorkflowEvent.CREATED   : add( workflow ); break;
277                default: break;
278                }
279            } else if( event.getSrc() instanceof Decision ) {
280                final Decision decision = event.getSrc();
281                switch( event.getType() ) {
282                // Add to DecisionQueue
283                case WorkflowEvent.DQ_ADDITION : addToDecisionQueue( decision ); break;
284                // Remove from DecisionQueue
285                case WorkflowEvent.DQ_REMOVAL  : removeFromDecisionQueue( decision ); break;
286                default: break;
287                }
288            }
289            serializeToDisk( new File( m_engine.getWorkDir(), SERIALIZATION_FILE ) );
290        }
291    }
292
293    /**
294     * Protected helper method that adds a newly created Workflow to the cache, and sets its {@code workflowManager} and
295     * {@code Id} properties if not set.
296     *
297     * @param workflow the workflow to add
298     */
299    protected void add( final Workflow workflow ) {
300        m_workflows.add( workflow );
301    }
302
303    /**
304     * Protected helper method that removes a specified Workflow from the cache, and moves it to the workflow history list. This method
305     * defensively checks to see if the workflow has not yet been removed.
306     *
307     * @param workflow the workflow to remove
308     */
309    protected void remove( final Workflow workflow ) {
310        if( m_workflows.contains( workflow ) ) {
311            m_workflows.remove( workflow );
312            m_completed.add( workflow );
313        }
314    }
315
316    protected void removeFromDecisionQueue( final Decision decision ) {
317        // If current workflow is waiting for input, restart it and remove Decision from DecisionQueue
318        final int workflowId = decision.getWorkflowId();
319        final Optional< Workflow > optw = m_workflows.stream().filter( w -> w.getId() == workflowId ).findAny();
320        if( optw.isPresent() ) {
321            final Workflow w = optw.get();
322            if( w.getCurrentState() == Workflow.WAITING && decision.equals( w.getCurrentStep() ) ) {
323                getDecisionQueue().remove( decision );
324                // Restart workflow
325                try {
326                    w.restart();
327                } catch( final WikiException e ) {
328                    LOG.error( "restarting workflow #" + w.getId() + " caused " + e.getMessage(), e );
329                }
330            }
331        }
332    }
333
334    protected void addToDecisionQueue( final Decision decision ) {
335        getDecisionQueue().add( decision );
336    }
337
338}