001/* 002 Licensed to the Apache Software Foundation (ASF) under one 003 or more contributor license agreements. See the NOTICE file 004 distributed with this work for additional information 005 regarding copyright ownership. The ASF licenses this file 006 to you under the Apache License, Version 2.0 (the 007 "License"); you may not use this file except in compliance 008 with the License. You may obtain a copy of the License at 009 010 http://www.apache.org/licenses/LICENSE-2.0 011 012 Unless required by applicable law or agreed to in writing, 013 software distributed under the License is distributed on an 014 "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 KIND, either express or implied. See the License for the 016 specific language governing permissions and limitations 017 under the License. 018 */ 019package org.apache.wiki.workflow; 020 021import org.apache.commons.lang3.time.StopWatch; 022import org.apache.log4j.Logger; 023import org.apache.wiki.api.core.Engine; 024import org.apache.wiki.api.core.Session; 025import org.apache.wiki.api.exceptions.WikiException; 026import org.apache.wiki.auth.AuthorizationManager; 027import org.apache.wiki.auth.acl.UnresolvedPrincipal; 028import org.apache.wiki.event.WikiEvent; 029import org.apache.wiki.event.WikiEventEmitter; 030import org.apache.wiki.event.WorkflowEvent; 031 032import java.io.BufferedInputStream; 033import java.io.BufferedOutputStream; 034import java.io.File; 035import java.io.FileInputStream; 036import java.io.FileOutputStream; 037import java.io.IOException; 038import java.io.ObjectInputStream; 039import java.io.ObjectOutputStream; 040import java.security.Principal; 041import java.util.ArrayList; 042import java.util.List; 043import java.util.Map; 044import java.util.Optional; 045import java.util.Properties; 046import java.util.Set; 047import java.util.concurrent.ConcurrentHashMap; 048import java.util.concurrent.CopyOnWriteArrayList; 049 050 051/** 052 * <p> 053 * Monitor class that tracks running Workflows. The WorkflowManager also keeps track of the names of 054 * users or groups expected to approve particular Workflows. 055 * </p> 056 */ 057public class DefaultWorkflowManager implements WorkflowManager { 058 059 private static final Logger LOG = Logger.getLogger( DefaultWorkflowManager.class ); 060 static final String SERIALIZATION_FILE = "wkflmgr.ser"; 061 062 /** We use this also a generic serialization id */ 063 private static final long serialVersionUID = 6L; 064 065 DecisionQueue m_queue = new DecisionQueue(); 066 Set< Workflow > m_workflows; 067 final Map< String, Principal > m_approvers; 068 List< Workflow > m_completed; 069 private Engine m_engine = null; 070 071 /** 072 * Constructs a new WorkflowManager, with an empty workflow cache. 073 */ 074 public DefaultWorkflowManager() { 075 m_workflows = ConcurrentHashMap.newKeySet(); 076 m_approvers = new ConcurrentHashMap<>(); 077 m_completed = new CopyOnWriteArrayList<>(); 078 WikiEventEmitter.attach( this ); 079 } 080 081 /** 082 * {@inheritDoc} 083 */ 084 @Override 085 public Set< Workflow > getWorkflows() { 086 final Set< Workflow > workflows = ConcurrentHashMap.newKeySet(); 087 workflows.addAll( m_workflows ); 088 return workflows; 089 } 090 091 /** 092 * {@inheritDoc} 093 */ 094 @Override 095 public List< Workflow > getCompletedWorkflows() { 096 return new CopyOnWriteArrayList< >( m_completed ); 097 } 098 099 /** 100 * {@inheritDoc} 101 * 102 * Any properties that begin with {@link #PROPERTY_APPROVER_PREFIX} will be assumed to be Decisions that require approval. For a given 103 * property key, everything after the prefix denotes the Decision's message key. The property value indicates the Principal (Role, 104 * GroupPrincipal, WikiPrincipal) that must approve the Decision. For example, if the property key/value pair is 105 * {@code jspwiki.approver.workflow.saveWikiPage=Admin}, the Decision's message key is <code>workflow.saveWikiPage</code>. The Principal 106 * <code>Admin</code> will be resolved via {@link org.apache.wiki.auth.AuthorizationManager#resolvePrincipal(String)}. 107 */ 108 @Override 109 public void initialize( final Engine engine, final Properties props ) { 110 m_engine = engine; 111 112 // Identify the workflows requiring approvals 113 for( final Object o : props.keySet() ) { 114 final String prop = ( String )o; 115 if( prop.startsWith( PROPERTY_APPROVER_PREFIX ) ) { 116 // For the key, everything after the prefix is the workflow name 117 final String key = prop.substring( PROPERTY_APPROVER_PREFIX.length() ); 118 if( key.length() > 0 ) { 119 // Only use non-null/non-blank approvers 120 final String approver = props.getProperty( prop ); 121 if( approver != null && approver.length() > 0 ) { 122 m_approvers.put( key, new UnresolvedPrincipal( approver ) ); 123 } 124 } 125 } 126 } 127 128 unserializeFromDisk( new File( m_engine.getWorkDir(), SERIALIZATION_FILE ) ); 129 } 130 131 /** 132 * Reads the serialized data from the disk back to memory. 133 * 134 * @return the date when the data was last written on disk or {@code 0} if there has been problems reading from disk. 135 */ 136 @SuppressWarnings( "unchecked" ) 137 synchronized long unserializeFromDisk( final File f ) { 138 long saved = 0L; 139 final StopWatch sw = new StopWatch(); 140 sw.start(); 141 try( final ObjectInputStream in = new ObjectInputStream( new BufferedInputStream( new FileInputStream( f ) ) ) ) { 142 final long ver = in.readLong(); 143 if( ver != serialVersionUID ) { 144 LOG.warn( "File format has changed; Unable to recover workflows and decision queue from disk." ); 145 } else { 146 saved = in.readLong(); 147 m_workflows = ( Set< Workflow > )in.readObject(); 148 m_queue = ( DecisionQueue )in.readObject(); 149 m_completed = ( List< Workflow > )in.readObject(); 150 LOG.debug( "Read serialized data successfully in " + sw ); 151 } 152 } catch( final IOException | ClassNotFoundException e ) { 153 LOG.error( "unable to recover from disk workflows and decision queue: " + e.getMessage(), e ); 154 } 155 sw.stop(); 156 157 return saved; 158 } 159 160 /** 161 * Serializes workflows and decisionqueue to disk. The format is private, don't touch it. 162 */ 163 synchronized void serializeToDisk( final File f ) { 164 try( final ObjectOutputStream out = new ObjectOutputStream( new BufferedOutputStream( new FileOutputStream( f ) ) ) ) { 165 final StopWatch sw = new StopWatch(); 166 sw.start(); 167 168 out.writeLong( serialVersionUID ); 169 out.writeLong( System.currentTimeMillis() ); // Timestamp 170 out.writeObject( m_workflows ); 171 out.writeObject( m_queue ); 172 out.writeObject( m_completed ); 173 174 sw.stop(); 175 176 LOG.debug( "serialization done - took " + sw ); 177 } catch( final IOException ioe ) { 178 LOG.error( "Unable to serialize!", ioe ); 179 } 180 } 181 182 /** 183 * {@inheritDoc} 184 */ 185 @Override 186 public boolean requiresApproval( final String messageKey ) { 187 return m_approvers.containsKey( messageKey ); 188 } 189 190 /** 191 * {@inheritDoc} 192 */ 193 @Override 194 public Principal getApprover( final String messageKey ) throws WikiException { 195 Principal approver = m_approvers.get( messageKey ); 196 if ( approver == null ) { 197 throw new WikiException( "Workflow '" + messageKey + "' does not require approval." ); 198 } 199 200 // Try to resolve UnresolvedPrincipals 201 if ( approver instanceof UnresolvedPrincipal ) { 202 final String name = approver.getName(); 203 approver = m_engine.getManager( AuthorizationManager.class ).resolvePrincipal( name ); 204 205 // If still unresolved, throw exception; otherwise, freshen our cache 206 if ( approver instanceof UnresolvedPrincipal ) { 207 throw new WikiException( "Workflow approver '" + name + "' cannot not be resolved." ); 208 } 209 210 m_approvers.put( messageKey, approver ); 211 } 212 return approver; 213 } 214 215 /** 216 * Protected helper method that returns the associated Engine 217 * 218 * @return the wiki engine 219 */ 220 protected Engine getEngine() { 221 if ( m_engine == null ) { 222 throw new IllegalStateException( "Engine cannot be null; please initialize WorkflowManager first." ); 223 } 224 return m_engine; 225 } 226 227 /** 228 * Returns the DecisionQueue associated with this WorkflowManager 229 * 230 * @return the decision queue 231 */ 232 @Override 233 public DecisionQueue getDecisionQueue() { 234 return m_queue; 235 } 236 237 /** 238 * {@inheritDoc} 239 */ 240 @Override 241 public List< Workflow > getOwnerWorkflows( final Session session ) { 242 final List< Workflow > workflows = new ArrayList<>(); 243 if ( session.isAuthenticated() ) { 244 final Principal[] sessionPrincipals = session.getPrincipals(); 245 for( final Workflow w : m_workflows ) { 246 final Principal owner = w.getOwner(); 247 for ( final Principal sessionPrincipal : sessionPrincipals ) { 248 if ( sessionPrincipal.equals( owner ) ) { 249 workflows.add( w ); 250 break; 251 } 252 } 253 } 254 } 255 return workflows; 256 } 257 258 /** 259 * Listens for {@link WorkflowEvent} objects emitted by Workflows. In particular, this method listens for {@link WorkflowEvent#CREATED}, 260 * {@link WorkflowEvent#ABORTED}, {@link WorkflowEvent#COMPLETED} and {@link WorkflowEvent#DQ_REMOVAL} events. If a workflow is created, 261 * it is automatically added to the cache. If one is aborted or completed, it is automatically removed. If a removal from decision queue 262 * is issued, the current step from workflow, which is assumed to be a {@link Decision}, is removed from the {@link DecisionQueue}. 263 * 264 * @param event the event passed to this listener 265 */ 266 @Override 267 public void actionPerformed( final WikiEvent event ) { 268 if( event instanceof WorkflowEvent ) { 269 if( event.getSrc() instanceof Workflow ) { 270 final Workflow workflow = event.getSrc(); 271 switch( event.getType() ) { 272 // Remove from manager 273 case WorkflowEvent.ABORTED : 274 case WorkflowEvent.COMPLETED : remove( workflow ); break; 275 // Add to manager 276 case WorkflowEvent.CREATED : add( workflow ); break; 277 default: break; 278 } 279 } else if( event.getSrc() instanceof Decision ) { 280 final Decision decision = event.getSrc(); 281 switch( event.getType() ) { 282 // Add to DecisionQueue 283 case WorkflowEvent.DQ_ADDITION : addToDecisionQueue( decision ); break; 284 // Remove from DecisionQueue 285 case WorkflowEvent.DQ_REMOVAL : removeFromDecisionQueue( decision ); break; 286 default: break; 287 } 288 } 289 serializeToDisk( new File( m_engine.getWorkDir(), SERIALIZATION_FILE ) ); 290 } 291 } 292 293 /** 294 * Protected helper method that adds a newly created Workflow to the cache, and sets its {@code workflowManager} and 295 * {@code Id} properties if not set. 296 * 297 * @param workflow the workflow to add 298 */ 299 protected void add( final Workflow workflow ) { 300 m_workflows.add( workflow ); 301 } 302 303 /** 304 * Protected helper method that removes a specified Workflow from the cache, and moves it to the workflow history list. This method 305 * defensively checks to see if the workflow has not yet been removed. 306 * 307 * @param workflow the workflow to remove 308 */ 309 protected void remove( final Workflow workflow ) { 310 if( m_workflows.contains( workflow ) ) { 311 m_workflows.remove( workflow ); 312 m_completed.add( workflow ); 313 } 314 } 315 316 protected void removeFromDecisionQueue( final Decision decision ) { 317 // If current workflow is waiting for input, restart it and remove Decision from DecisionQueue 318 final int workflowId = decision.getWorkflowId(); 319 final Optional< Workflow > optw = m_workflows.stream().filter( w -> w.getId() == workflowId ).findAny(); 320 if( optw.isPresent() ) { 321 final Workflow w = optw.get(); 322 if( w.getCurrentState() == Workflow.WAITING && decision.equals( w.getCurrentStep() ) ) { 323 getDecisionQueue().remove( decision ); 324 // Restart workflow 325 try { 326 w.restart(); 327 } catch( final WikiException e ) { 328 LOG.error( "restarting workflow #" + w.getId() + " caused " + e.getMessage(), e ); 329 } 330 } 331 } 332 } 333 334 protected void addToDecisionQueue( final Decision decision ) { 335 getDecisionQueue().add( decision ); 336 } 337 338}