019package org.apache.wiki.workflow;
021import org.apache.commons.collections4.queue.CircularFifoQueue;
022import org.apache.commons.lang3.time.StopWatch;
023import org.apache.logging.log4j.LogManager;
024import org.apache.logging.log4j.Logger;
025import org.apache.wiki.api.core.Context;
026import org.apache.wiki.api.core.Engine;
027import org.apache.wiki.api.core.Session;
028import org.apache.wiki.api.exceptions.WikiException;
029import org.apache.wiki.auth.AuthorizationManager;
030import org.apache.wiki.auth.acl.UnresolvedPrincipal;
031import org.apache.wiki.event.WikiEvent;
032import org.apache.wiki.event.WikiEventEmitter;
033import org.apache.wiki.event.WorkflowEvent;
034import org.apache.wiki.util.TextUtil;
036import java.io.*;
037import java.nio.file.Files;
038import java.security.Principal;
039import java.util.*;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.CopyOnWriteArrayList;
045 * <p>
046 * Monitor class that tracks running Workflows. The WorkflowManager also keeps track of the names of
047 * users or groups expected to approve particular Workflows.
048 * </p>
049 */
050public class DefaultWorkflowManager implements WorkflowManager, Serializable {
052    private static final Logger LOG = LogManager.getLogger( DefaultWorkflowManager.class );
053    static final String SERIALIZATION_FILE = "wkflmgr.ser";
055    /** We use this also a generic serialization id */
056    private static final long serialVersionUID = 6L;
058    DecisionQueue m_queue;
059    Set< Workflow > m_workflows;
060    final Map< String, Principal > m_approvers;
061    Queue< Workflow > m_completed;
062    private Engine m_engine;
063    private int retainCompleted;
065    /**
066     * Constructs a new WorkflowManager, with an empty workflow cache.
067     */
068    public DefaultWorkflowManager() {
069        m_workflows = ConcurrentHashMap.newKeySet();
070        m_approvers = new ConcurrentHashMap<>();
071        m_queue = new DecisionQueue();
072        WikiEventEmitter.attach( this );
073    }
075    /**
076     * {@inheritDoc}
077     */
078    @Override
079    public Set< Workflow > getWorkflows() {
080        final Set< Workflow > workflows = ConcurrentHashMap.newKeySet();
081        workflows.addAll( m_workflows );
082        return workflows;
083    }
085    /**
086     * {@inheritDoc}
087     */
088    @Override
089    public Map< Integer, Workflow > getWorkflowsAsMap() {
090        final Map< Integer, Workflow > workflows = new ConcurrentHashMap<>();
091        m_workflows.forEach( w -> workflows.put( w.getId(), w ) );
092        return workflows;
093    }
095    /**
096     * {@inheritDoc}
097     */
098    @Override
099    public List< Workflow > getCompletedWorkflows() {
100        return new CopyOnWriteArrayList< >( m_completed );
101    }
103    /**
104     * {@inheritDoc}
105     *
106     * Any properties that begin with {@link #PROPERTY_APPROVER_PREFIX} will be assumed to be Decisions that require approval. For a given
107     * property key, everything after the prefix denotes the Decision's message key. The property value indicates the Principal (Role,
108     * GroupPrincipal, WikiPrincipal) that must approve the Decision. For example, if the property key/value pair is
109     * {@code jspwiki.approver.workflow.saveWikiPage=Admin}, the Decision's message key is <code>workflow.saveWikiPage</code>. The Principal
110     * <code>Admin</code> will be resolved via {@link org.apache.wiki.auth.AuthorizationManager#resolvePrincipal(String)}.
111     */
112    @Override
113    public void initialize( final Engine engine, final Properties props ) {
114        m_engine = engine;
115        retainCompleted = TextUtil.getIntegerProperty( engine.getWikiProperties(), "jspwiki.workflow.completed.retain", 2048 );
116        m_completed = new CircularFifoQueue<>( retainCompleted );
118        // Identify the workflows requiring approvals
119        for( final Object o : props.keySet() ) {
120            final String prop = ( String )o;
121            if( prop.startsWith( PROPERTY_APPROVER_PREFIX ) ) {
122                // For the key, everything after the prefix is the workflow name
123                final String key = prop.substring( PROPERTY_APPROVER_PREFIX.length() );
124                if(!key.isEmpty()) {
125                    // Only use non-null/non-blank approvers
126                    final String approver = props.getProperty( prop );
127                    if( approver != null && !approver.isEmpty() ) {
128                        m_approvers.put( key, new UnresolvedPrincipal( approver ) );
129                    }
130                }
131            }
132        }
134        unserializeFromDisk( new File( m_engine.getWorkDir(), SERIALIZATION_FILE ) );
135    }
137    /**
138     *  Reads the serialized data from the disk back to memory.
139     *
140     * @return the date when the data was last written on disk or {@code 0} if there has been problems reading from disk.
141     */
142    @SuppressWarnings( "unchecked" )
143    synchronized long unserializeFromDisk( final File f ) {
144        long saved = 0L;
145        final StopWatch sw = new StopWatch();
146        sw.start();
147        try( final ObjectInputStream in = new ObjectInputStream( new BufferedInputStream( Files.newInputStream( f.toPath() ) ) ) ) {
148            final long ver = in.readLong();
149            if( ver != serialVersionUID ) {
150                LOG.warn( "File format has changed; Unable to recover workflows and decision queue from disk." );
151            } else {
152                saved        = in.readLong();
153                m_workflows  = ( Set< Workflow > )in.readObject();
154                m_queue      = ( DecisionQueue )in.readObject();
155                m_completed = new CircularFifoQueue<>( retainCompleted );
156                m_completed.addAll( ( Collection< Workflow > )in.readObject() );
157                LOG.debug( "Read serialized data successfully in " + sw );
158            }
159        } catch( final IOException | ClassNotFoundException e ) {
160            LOG.warn( "unable to recover from disk workflows and decision queue: " + e.getMessage() );
161        }
162        sw.stop();
164        return saved;
165    }
167    /**
168     *  Serializes workflows and decisionqueue to disk.  The format is private, don't touch it.
169     */
170    synchronized void serializeToDisk( final File f ) {
171        try( final ObjectOutputStream out = new ObjectOutputStream( new BufferedOutputStream( Files.newOutputStream( f.toPath() ) ) ) ) {
172            final StopWatch sw = new StopWatch();
173            sw.start();
175            out.writeLong( serialVersionUID );
176            out.writeLong( System.currentTimeMillis() ); // Timestamp
177            out.writeObject( m_workflows );
178            out.writeObject( m_queue );
179            out.writeObject( m_completed );
181            sw.stop();
183            LOG.debug( "serialization done - took " + sw );
184        } catch( final IOException ioe ) {
185            LOG.error( "Unable to serialize!", ioe );
186        }
187    }
189    /**
190     * {@inheritDoc}
191     */
192    @Override
193    public boolean requiresApproval( final String messageKey ) {
194        return  m_approvers.containsKey( messageKey );
195    }
197    /**
198     * {@inheritDoc}
199     */
200    @Override
201    public Principal getApprover( final String messageKey ) throws WikiException {
202        Principal approver = m_approvers.get( messageKey );
203        if ( approver == null ) {
204            throw new WikiException( "Workflow '" + messageKey + "' does not require approval." );
205        }
207        // Try to resolve UnresolvedPrincipals
208        if ( approver instanceof UnresolvedPrincipal ) {
209            final String name = approver.getName();
210            approver = m_engine.getManager( AuthorizationManager.class ).resolvePrincipal( name );
212            // If still unresolved, throw exception; otherwise, freshen our cache
213            if ( approver instanceof UnresolvedPrincipal ) {
214                throw new WikiException( "Workflow approver '" + name + "' cannot not be resolved." );
215            }
217            m_approvers.put( messageKey, approver );
218        }
219        return approver;
220    }
222    /**
223     * Protected helper method that returns the associated Engine
224     *
225     * @return the wiki engine
226     */
227    protected Engine getEngine() {
228        if ( m_engine == null ) {
229            throw new IllegalStateException( "Engine cannot be null; please initialize WorkflowManager first." );
230        }
231        return m_engine;
232    }
234    /**
235     * Returns the DecisionQueue associated with this WorkflowManager
236     *
237     * @return the decision queue
238     */
239    @Override
240    public DecisionQueue getDecisionQueue() {
241        return m_queue;
242    }
244    /**
245     * {@inheritDoc}
246     */
247    @Override
248    public List< Workflow > getOwnerWorkflows( final Session session ) {
249        final List< Workflow > workflows = new ArrayList<>();
250        if ( session.isAuthenticated() ) {
251            final Principal[] sessionPrincipals = session.getPrincipals();
252            for( final Workflow w : m_workflows ) {
253                final Principal owner = w.getOwner();
254                if (Arrays.stream(sessionPrincipals).anyMatch(sessionPrincipal -> sessionPrincipal.equals(owner))) {
255                    workflows.add(w);
256                }
257            }
258        }
259        return workflows;
260    }
262    /**
263     * Listens for {@link WorkflowEvent} objects emitted by Workflows. In particular, this method listens for {@link WorkflowEvent#CREATED},
264     * {@link WorkflowEvent#ABORTED}, {@link WorkflowEvent#COMPLETED} and {@link WorkflowEvent#DQ_REMOVAL} events. If a workflow is created,
265     * it is automatically added to the cache. If one is aborted or completed, it is automatically removed. If a removal from decision queue
266     * is issued, the current step from workflow, which is assumed to be a {@link Decision}, is removed from the {@link DecisionQueue}.
267     * 
268     * @param event the event passed to this listener
269     */
270    @Override
271    public void actionPerformed( final WikiEvent event ) {
272        if( event instanceof WorkflowEvent ) {
273            if( event.getSrc() instanceof Workflow ) {
274                final Workflow workflow = event.getSrc();
275                switch( event.getType() ) {
276                // Remove from manager
277                case WorkflowEvent.ABORTED   :
278                case WorkflowEvent.COMPLETED : remove( workflow ); break;
279                // Add to manager
280                case WorkflowEvent.CREATED   : add( workflow ); break;
281                default: break;
282                }
283            } else if( event.getSrc() instanceof Decision ) {
284                final Decision decision = event.getSrc();
285                switch( event.getType() ) {
286                // Add to DecisionQueue
287                case WorkflowEvent.DQ_ADDITION : addToDecisionQueue( decision ); break;
288                // Remove from DecisionQueue
289                case WorkflowEvent.DQ_REMOVAL  : removeFromDecisionQueue( decision, event.getArg( 0, Context.class ) ); break;
290                default: break;
291                }
292            }
293            serializeToDisk( new File( m_engine.getWorkDir(), SERIALIZATION_FILE ) );
294        }
295    }
297    /**
298     * Protected helper method that adds a newly created Workflow to the cache, and sets its {@code workflowManager} and
299     * {@code Id} properties if not set.
300     *
301     * @param workflow the workflow to add
302     */
303    protected void add( final Workflow workflow ) {
304        m_workflows.add( workflow );
305    }
307    /**
308     * Protected helper method that removes a specified Workflow from the cache, and moves it to the workflow history list. This method
309     * defensively checks to see if the workflow has not yet been removed.
310     *
311     * @param workflow the workflow to remove
312     */
313    protected void remove( final Workflow workflow ) {
314        if( m_workflows.contains( workflow ) ) {
315            m_workflows.remove( workflow );
316            m_completed.add( workflow );
317        }
318    }
320    protected void removeFromDecisionQueue( final Decision decision, final Context context ) {
321        // If current workflow is waiting for input, restart it and remove Decision from DecisionQueue
322        final int workflowId = decision.getWorkflowId();
323        final Optional< Workflow > optw = m_workflows.stream().filter( w -> w.getId() == workflowId ).findAny();
324        if( optw.isPresent() ) {
325            final Workflow w = optw.get();
326            if( w.getCurrentState() == Workflow.WAITING && decision.equals( w.getCurrentStep() ) ) {
327                getDecisionQueue().remove( decision );
328                // Restart workflow
329                try {
330                    w.restart( context );
331                } catch( final WikiException e ) {
332                    LOG.error( "restarting workflow #" + w.getId() + " caused " + e.getMessage(), e );
333                }
334            }
335        }
336    }
338    protected void addToDecisionQueue( final Decision decision ) {
339        getDecisionQueue().add( decision );
340    }