001/* 002 Licensed to the Apache Software Foundation (ASF) under one 003 or more contributor license agreements. See the NOTICE file 004 distributed with this work for additional information 005 regarding copyright ownership. The ASF licenses this file 006 to you under the Apache License, Version 2.0 (the 007 "License"); you may not use this file except in compliance 008 with the License. You may obtain a copy of the License at 009 010 http://www.apache.org/licenses/LICENSE-2.0 011 012 Unless required by applicable law or agreed to in writing, 013 software distributed under the License is distributed on an 014 "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 KIND, either express or implied. See the License for the 016 specific language governing permissions and limitations 017 under the License. 018 */ 019package org.apache.wiki.workflow; 020 021import org.apache.commons.collections4.queue.CircularFifoQueue; 022import org.apache.commons.lang3.time.StopWatch; 023import org.apache.log4j.Logger; 024import org.apache.wiki.api.core.Context; 025import org.apache.wiki.api.core.Engine; 026import org.apache.wiki.api.core.Session; 027import org.apache.wiki.api.exceptions.WikiException; 028import org.apache.wiki.auth.AuthorizationManager; 029import org.apache.wiki.auth.acl.UnresolvedPrincipal; 030import org.apache.wiki.event.WikiEvent; 031import org.apache.wiki.event.WikiEventEmitter; 032import org.apache.wiki.event.WorkflowEvent; 033import org.apache.wiki.util.TextUtil; 034 035import java.io.*; 036import java.security.Principal; 037import java.util.*; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.CopyOnWriteArrayList; 040 041 042/** 043 * <p> 044 * Monitor class that tracks running Workflows. The WorkflowManager also keeps track of the names of 045 * users or groups expected to approve particular Workflows. 046 * </p> 047 */ 048public class DefaultWorkflowManager implements WorkflowManager { 049 050 private static final Logger LOG = Logger.getLogger( DefaultWorkflowManager.class ); 051 static final String SERIALIZATION_FILE = "wkflmgr.ser"; 052 053 /** We use this also a generic serialization id */ 054 private static final long serialVersionUID = 6L; 055 056 DecisionQueue m_queue; 057 Set< Workflow > m_workflows; 058 final Map< String, Principal > m_approvers; 059 Queue< Workflow > m_completed; 060 private Engine m_engine = null; 061 private int retainCompleted; 062 063 /** 064 * Constructs a new WorkflowManager, with an empty workflow cache. 065 */ 066 public DefaultWorkflowManager() { 067 m_workflows = ConcurrentHashMap.newKeySet(); 068 m_approvers = new ConcurrentHashMap<>(); 069 m_queue = new DecisionQueue(); 070 WikiEventEmitter.attach( this ); 071 } 072 073 /** 074 * {@inheritDoc} 075 */ 076 @Override 077 public Set< Workflow > getWorkflows() { 078 final Set< Workflow > workflows = ConcurrentHashMap.newKeySet(); 079 workflows.addAll( m_workflows ); 080 return workflows; 081 } 082 083 /** 084 * {@inheritDoc} 085 */ 086 @Override 087 public List< Workflow > getCompletedWorkflows() { 088 return new CopyOnWriteArrayList< >( m_completed ); 089 } 090 091 /** 092 * {@inheritDoc} 093 * 094 * Any properties that begin with {@link #PROPERTY_APPROVER_PREFIX} will be assumed to be Decisions that require approval. For a given 095 * property key, everything after the prefix denotes the Decision's message key. The property value indicates the Principal (Role, 096 * GroupPrincipal, WikiPrincipal) that must approve the Decision. For example, if the property key/value pair is 097 * {@code jspwiki.approver.workflow.saveWikiPage=Admin}, the Decision's message key is <code>workflow.saveWikiPage</code>. The Principal 098 * <code>Admin</code> will be resolved via {@link org.apache.wiki.auth.AuthorizationManager#resolvePrincipal(String)}. 099 */ 100 @Override 101 public void initialize( final Engine engine, final Properties props ) { 102 m_engine = engine; 103 retainCompleted = TextUtil.getIntegerProperty( engine.getWikiProperties(), "jspwiki.workflow.completed.retain", 2048 ); 104 m_completed = new CircularFifoQueue<>( retainCompleted ); 105 106 // Identify the workflows requiring approvals 107 for( final Object o : props.keySet() ) { 108 final String prop = ( String )o; 109 if( prop.startsWith( PROPERTY_APPROVER_PREFIX ) ) { 110 // For the key, everything after the prefix is the workflow name 111 final String key = prop.substring( PROPERTY_APPROVER_PREFIX.length() ); 112 if( key.length() > 0 ) { 113 // Only use non-null/non-blank approvers 114 final String approver = props.getProperty( prop ); 115 if( approver != null && approver.length() > 0 ) { 116 m_approvers.put( key, new UnresolvedPrincipal( approver ) ); 117 } 118 } 119 } 120 } 121 122 unserializeFromDisk( new File( m_engine.getWorkDir(), SERIALIZATION_FILE ) ); 123 } 124 125 /** 126 * Reads the serialized data from the disk back to memory. 127 * 128 * @return the date when the data was last written on disk or {@code 0} if there has been problems reading from disk. 129 */ 130 @SuppressWarnings( "unchecked" ) 131 synchronized long unserializeFromDisk( final File f ) { 132 long saved = 0L; 133 final StopWatch sw = new StopWatch(); 134 sw.start(); 135 try( final ObjectInputStream in = new ObjectInputStream( new BufferedInputStream( new FileInputStream( f ) ) ) ) { 136 final long ver = in.readLong(); 137 if( ver != serialVersionUID ) { 138 LOG.warn( "File format has changed; Unable to recover workflows and decision queue from disk." ); 139 } else { 140 saved = in.readLong(); 141 m_workflows = ( Set< Workflow > )in.readObject(); 142 m_queue = ( DecisionQueue )in.readObject(); 143 m_completed = new CircularFifoQueue<>( retainCompleted ); 144 m_completed.addAll( ( Collection< Workflow > )in.readObject() ); 145 LOG.debug( "Read serialized data successfully in " + sw ); 146 } 147 } catch( final IOException | ClassNotFoundException e ) { 148 LOG.warn( "unable to recover from disk workflows and decision queue: " + e.getMessage() ); 149 } 150 sw.stop(); 151 152 return saved; 153 } 154 155 /** 156 * Serializes workflows and decisionqueue to disk. The format is private, don't touch it. 157 */ 158 synchronized void serializeToDisk( final File f ) { 159 try( final ObjectOutputStream out = new ObjectOutputStream( new BufferedOutputStream( new FileOutputStream( f ) ) ) ) { 160 final StopWatch sw = new StopWatch(); 161 sw.start(); 162 163 out.writeLong( serialVersionUID ); 164 out.writeLong( System.currentTimeMillis() ); // Timestamp 165 out.writeObject( m_workflows ); 166 out.writeObject( m_queue ); 167 out.writeObject( m_completed ); 168 169 sw.stop(); 170 171 LOG.debug( "serialization done - took " + sw ); 172 } catch( final IOException ioe ) { 173 LOG.error( "Unable to serialize!", ioe ); 174 } 175 } 176 177 /** 178 * {@inheritDoc} 179 */ 180 @Override 181 public boolean requiresApproval( final String messageKey ) { 182 return m_approvers.containsKey( messageKey ); 183 } 184 185 /** 186 * {@inheritDoc} 187 */ 188 @Override 189 public Principal getApprover( final String messageKey ) throws WikiException { 190 Principal approver = m_approvers.get( messageKey ); 191 if ( approver == null ) { 192 throw new WikiException( "Workflow '" + messageKey + "' does not require approval." ); 193 } 194 195 // Try to resolve UnresolvedPrincipals 196 if ( approver instanceof UnresolvedPrincipal ) { 197 final String name = approver.getName(); 198 approver = m_engine.getManager( AuthorizationManager.class ).resolvePrincipal( name ); 199 200 // If still unresolved, throw exception; otherwise, freshen our cache 201 if ( approver instanceof UnresolvedPrincipal ) { 202 throw new WikiException( "Workflow approver '" + name + "' cannot not be resolved." ); 203 } 204 205 m_approvers.put( messageKey, approver ); 206 } 207 return approver; 208 } 209 210 /** 211 * Protected helper method that returns the associated Engine 212 * 213 * @return the wiki engine 214 */ 215 protected Engine getEngine() { 216 if ( m_engine == null ) { 217 throw new IllegalStateException( "Engine cannot be null; please initialize WorkflowManager first." ); 218 } 219 return m_engine; 220 } 221 222 /** 223 * Returns the DecisionQueue associated with this WorkflowManager 224 * 225 * @return the decision queue 226 */ 227 @Override 228 public DecisionQueue getDecisionQueue() { 229 return m_queue; 230 } 231 232 /** 233 * {@inheritDoc} 234 */ 235 @Override 236 public List< Workflow > getOwnerWorkflows( final Session session ) { 237 final List< Workflow > workflows = new ArrayList<>(); 238 if ( session.isAuthenticated() ) { 239 final Principal[] sessionPrincipals = session.getPrincipals(); 240 for( final Workflow w : m_workflows ) { 241 final Principal owner = w.getOwner(); 242 for ( final Principal sessionPrincipal : sessionPrincipals ) { 243 if ( sessionPrincipal.equals( owner ) ) { 244 workflows.add( w ); 245 break; 246 } 247 } 248 } 249 } 250 return workflows; 251 } 252 253 /** 254 * Listens for {@link WorkflowEvent} objects emitted by Workflows. In particular, this method listens for {@link WorkflowEvent#CREATED}, 255 * {@link WorkflowEvent#ABORTED}, {@link WorkflowEvent#COMPLETED} and {@link WorkflowEvent#DQ_REMOVAL} events. If a workflow is created, 256 * it is automatically added to the cache. If one is aborted or completed, it is automatically removed. If a removal from decision queue 257 * is issued, the current step from workflow, which is assumed to be a {@link Decision}, is removed from the {@link DecisionQueue}. 258 * 259 * @param event the event passed to this listener 260 */ 261 @Override 262 public void actionPerformed( final WikiEvent event ) { 263 if( event instanceof WorkflowEvent ) { 264 if( event.getSrc() instanceof Workflow ) { 265 final Workflow workflow = event.getSrc(); 266 switch( event.getType() ) { 267 // Remove from manager 268 case WorkflowEvent.ABORTED : 269 case WorkflowEvent.COMPLETED : remove( workflow ); break; 270 // Add to manager 271 case WorkflowEvent.CREATED : add( workflow ); break; 272 default: break; 273 } 274 } else if( event.getSrc() instanceof Decision ) { 275 final Decision decision = event.getSrc(); 276 switch( event.getType() ) { 277 // Add to DecisionQueue 278 case WorkflowEvent.DQ_ADDITION : addToDecisionQueue( decision ); break; 279 // Remove from DecisionQueue 280 case WorkflowEvent.DQ_REMOVAL : removeFromDecisionQueue( decision, event.getArg( 0, Context.class ) ); break; 281 default: break; 282 } 283 } 284 serializeToDisk( new File( m_engine.getWorkDir(), SERIALIZATION_FILE ) ); 285 } 286 } 287 288 /** 289 * Protected helper method that adds a newly created Workflow to the cache, and sets its {@code workflowManager} and 290 * {@code Id} properties if not set. 291 * 292 * @param workflow the workflow to add 293 */ 294 protected void add( final Workflow workflow ) { 295 m_workflows.add( workflow ); 296 } 297 298 /** 299 * Protected helper method that removes a specified Workflow from the cache, and moves it to the workflow history list. This method 300 * defensively checks to see if the workflow has not yet been removed. 301 * 302 * @param workflow the workflow to remove 303 */ 304 protected void remove( final Workflow workflow ) { 305 if( m_workflows.contains( workflow ) ) { 306 m_workflows.remove( workflow ); 307 m_completed.add( workflow ); 308 } 309 } 310 311 protected void removeFromDecisionQueue( final Decision decision, final Context context ) { 312 // If current workflow is waiting for input, restart it and remove Decision from DecisionQueue 313 final int workflowId = decision.getWorkflowId(); 314 final Optional< Workflow > optw = m_workflows.stream().filter( w -> w.getId() == workflowId ).findAny(); 315 if( optw.isPresent() ) { 316 final Workflow w = optw.get(); 317 if( w.getCurrentState() == Workflow.WAITING && decision.equals( w.getCurrentStep() ) ) { 318 getDecisionQueue().remove( decision ); 319 // Restart workflow 320 try { 321 w.restart( context ); 322 } catch( final WikiException e ) { 323 LOG.error( "restarting workflow #" + w.getId() + " caused " + e.getMessage(), e ); 324 } 325 } 326 } 327 } 328 329 protected void addToDecisionQueue( final Decision decision ) { 330 getDecisionQueue().add( decision ); 331 } 332 333}