001/* 002 Licensed to the Apache Software Foundation (ASF) under one 003 or more contributor license agreements. See the NOTICE file 004 distributed with this work for additional information 005 regarding copyright ownership. The ASF licenses this file 006 to you under the Apache License, Version 2.0 (the 007 "License"); you may not use this file except in compliance 008 with the License. You may obtain a copy of the License at 009 010 http://www.apache.org/licenses/LICENSE-2.0 011 012 Unless required by applicable law or agreed to in writing, 013 software distributed under the License is distributed on an 014 "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 KIND, either express or implied. See the License for the 016 specific language governing permissions and limitations 017 under the License. 018 */ 019package org.apache.wiki.search.kendra; 020 021import com.amazonaws.services.kendra.AWSkendra; 022import com.amazonaws.services.kendra.AWSkendraClientBuilder; 023import com.amazonaws.services.kendra.model.*; 024import com.amazonaws.util.IOUtils; 025import com.google.gson.Gson; 026import com.google.gson.GsonBuilder; 027import com.google.gson.reflect.TypeToken; 028import org.apache.commons.io.FilenameUtils; 029import org.apache.commons.lang3.StringUtils; 030import org.apache.logging.log4j.LogManager; 031import org.apache.logging.log4j.Logger; 032import org.apache.wiki.InternalWikiException; 033import org.apache.wiki.WatchDog; 034import org.apache.wiki.WikiBackgroundThread; 035import org.apache.wiki.api.core.Attachment; 036import org.apache.wiki.api.core.Context; 037import org.apache.wiki.api.core.Engine; 038import org.apache.wiki.api.core.Page; 039import org.apache.wiki.api.exceptions.NoRequiredPropertyException; 040import org.apache.wiki.api.exceptions.ProviderException; 041import org.apache.wiki.api.providers.PageProvider; 042import org.apache.wiki.api.search.SearchResult; 043import org.apache.wiki.api.spi.Wiki; 044import org.apache.wiki.attachment.AttachmentManager; 045import org.apache.wiki.auth.AuthorizationManager; 046import org.apache.wiki.auth.permissions.PagePermission; 047import org.apache.wiki.pages.PageManager; 048import org.apache.wiki.search.SearchProvider; 049import org.apache.wiki.util.TextUtil; 050 051import java.io.IOException; 052import java.io.InputStream; 053import java.io.InputStreamReader; 054import java.lang.reflect.Type; 055import java.nio.ByteBuffer; 056import java.nio.charset.StandardCharsets; 057import java.util.*; 058 059import static java.lang.String.format; 060 061/** 062 * Search provider that implements {link SearchProvider} using AWS Kendra for 063 * indexing. Note that we are using a Custom DataSource which limits the 064 * attributes that can be uploaded / searched for each page (as per 065 * https://docs.aws.amazon.com/kendra/latest/dg/custom-attributes.html). This 066 * could be overcome by using an S3 bucket where any custom attributes can be 067 * added. 068 * 069 * @since 2.11.0 070 */ 071public class KendraSearchProvider implements SearchProvider { 072 073 private static final Logger LOG = LogManager.getLogger( KendraSearchProvider.class ); 074 private Engine engine; 075 private Properties properties; 076 private Map< String, Object > contentTypes; 077 private AWSkendra kendra; 078 private String indexName; 079 private String indexId; 080 private String dataSourceName; 081 private String dataSourceId; 082 083 private final List< Page > updates = Collections.synchronizedList( new ArrayList<>() ); 084 085 private static final String PROP_KENDRA_INDEX_NAME = "jspwiki.kendra.indexName"; 086 private static final String PROP_KENDRA_DATA_SOURCE_NAME = "jspwiki.kendra.dataSourceName"; 087 private static final String PROP_KENDRA_INDEXDELAY = "jspwiki.kendra.indexdelay"; 088 private static final String PROP_KENDRA_INITIALDELAY = "jspwiki.kendra.initialdelay"; 089 090 public KendraSearchProvider() { 091 } 092 093 /** 094 * {@inheritDoc} 095 */ 096 @Override 097 public void initialize( final Engine engine, final Properties properties ) throws NoRequiredPropertyException, IOException { 098 this.engine = engine; 099 this.properties = properties; 100 this.contentTypes = getContentTypes(); 101 102 setKendra( buildClient() ); 103 104 this.indexName = TextUtil.getRequiredProperty( this.properties, PROP_KENDRA_INDEX_NAME ); 105 this.dataSourceName = TextUtil.getRequiredProperty( this.properties, PROP_KENDRA_DATA_SOURCE_NAME ); 106 final int initialDelay = TextUtil.getIntegerProperty( this.properties, PROP_KENDRA_INITIALDELAY, KendraUpdater.INITIAL_DELAY ); 107 final int indexDelay = TextUtil.getIntegerProperty( this.properties, PROP_KENDRA_INDEXDELAY, KendraUpdater.INDEX_DELAY ); 108 109 // Start the Kendra update thread, which waits first for a little while 110 // before starting to go through the "pages that need updating". 111 if ( initialDelay >= 0 ) { 112 final KendraUpdater updater = new KendraUpdater( engine, this, initialDelay, indexDelay ); 113 updater.start(); 114 } 115 } 116 117 private Map< String, Object > getContentTypes() { 118 final Gson gson = new GsonBuilder().create(); 119 try ( final InputStream in = KendraSearchProvider.class.getResourceAsStream( "content_types.json" ) ) { 120 if ( in != null ) { 121 final Type collectionType = new TypeToken< HashMap< String, Object > >() { 122 }.getType(); 123 return gson.fromJson( new InputStreamReader( in ), collectionType ); 124 } 125 } catch ( final IOException e ) { 126 LOG.error( format( "Unable to load default propertyfile 'content_types.json': %s", e.getMessage() ), e ); 127 } 128 return null; 129 } 130 131 /** 132 * {@inheritDoc} 133 */ 134 @Override 135 public String getProviderInfo() { 136 return "KendraSearchProvider"; 137 } 138 139 /** 140 * {@inheritDoc} 141 */ 142 @Override 143 public void pageRemoved( final Page page ) { 144 final String pageName = page.getName(); 145 final BatchDeleteDocumentRequest request = new BatchDeleteDocumentRequest().withIndexId( indexId ) 146 .withDocumentIdList( pageName ); 147 final BatchDeleteDocumentResult result = getKendra().batchDeleteDocument( request ); 148 if ( result.getFailedDocuments().size() == 0 ) { 149 LOG.debug( format( "Page '%s' was removed from index", pageName ) ); 150 } else { 151 LOG.error( format( "Failed to remove Page '%s' from index", pageName ) ); 152 } 153 } 154 155 /** 156 * {@inheritDoc} 157 */ 158 @Override 159 public void reindexPage( final Page page ) { 160 if ( page != null ) { 161 updates.add( page ); 162 LOG.debug( format( "Scheduling page '%s' for indexing ...", page.getName() ) ); 163 } 164 } 165 166 /** 167 * {@inheritDoc} 168 */ 169 @Override 170 public Collection< SearchResult > findPages( final String query, final Context wikiContext ) throws ProviderException, IOException { 171 final QueryRequest request = new QueryRequest().withIndexId( indexId ).withQueryText( query ); 172 final List< QueryResultItem > items; 173 try { 174 items = getKendra().query( request ).getResultItems(); 175 } catch ( final ThrottlingException e ) { 176 LOG.error( "ThrottlingException. Skipping..." ); 177 return new ArrayList<>(); 178 } 179 final List< SearchResult > searchResults = new ArrayList<>( items.size() ); 180 final AuthorizationManager mgr = engine.getManager( AuthorizationManager.class ); 181 182 for ( final QueryResultItem item : items ) { 183 switch( QueryResultType.fromValue( item.getType() ) ) { 184 case DOCUMENT: 185 final String documentId = item.getDocumentId(); 186 final String documentExcerpt = item.getDocumentExcerpt().getText(); 187 final String scoreConfidence = item.getScoreAttributes().getScoreConfidence(); 188 final Page page = this.engine.getManager( PageManager.class ).getPage( documentId, PageProvider.LATEST_VERSION ); 189 if ( page != null ) { 190 final PagePermission pp = new PagePermission( page, PagePermission.VIEW_ACTION ); 191 if ( mgr.checkPermission( wikiContext.getWikiSession(), pp ) ) { 192 final SearchResult searchResult = new SearchResultImpl( page, confidence2score( scoreConfidence ), 193 new String[]{ documentExcerpt } ); 194 searchResults.add( searchResult ); 195 } else { 196 LOG.error( format( "Page '%s' is not accessible", documentId ) ); 197 } 198 } else { 199 LOG.error( 200 format( "Kendra found a result page '%s' that could not be loaded, removing from index", documentId ) ); 201 pageRemoved( Wiki.contents().page( this.engine, documentId ) ); 202 } 203 break; 204 default: 205 LOG.error( format( "Unknown query result type: %s", item.getType() ) ); 206 } 207 } 208 return searchResults; 209 } 210 211 /** 212 * This method initialize the AWS Kendra Index and Datasources to be used. 213 */ 214 public void initializeIndexAndDataSource() { 215 this.indexId = getIndexId( indexName ); 216 if ( this.indexId == null ) { 217 final String message = format( "Index '%s' does not exist", indexName ); 218 LOG.error( message ); 219 throw new IllegalArgumentException( message ); 220 } 221 this.dataSourceId = getDatasourceId( this.indexId, dataSourceName ); 222 if ( this.dataSourceId == null ) { 223 final String message = format( "Datasource '%s' does not exist in index %s", dataSourceName, indexName ); 224 LOG.error( message ); 225 throw new IllegalArgumentException( message ); 226 } 227 } 228 229 /** 230 * Given an Kendra's Index name, returns the corresponding Index Id, or 231 * {@code null} if it does not exists 232 * 233 * @param indexName the name of the index to look up 234 * @return the index id or {@code null} 235 */ 236 private String getIndexId( final String indexName ) { 237 ListIndicesRequest request = new ListIndicesRequest(); 238 ListIndicesResult result = getKendra().listIndices( request ); 239 String nextToken = ""; 240 while ( nextToken != null ) { 241 final List< IndexConfigurationSummary > items = result.getIndexConfigurationSummaryItems(); 242 if ( items == null || items.isEmpty() ) { 243 return null; 244 } 245 for ( final IndexConfigurationSummary item : items ) { 246 if ( StringUtils.equals( item.getName(), indexName ) ) { 247 return item.getId(); 248 } 249 } 250 nextToken = result.getNextToken(); 251 request = new ListIndicesRequest().withNextToken( result.getNextToken() ); 252 result = getKendra().listIndices( request ); 253 } 254 return null; 255 } 256 257 /** 258 * Given an Kendra's Datasource name, returns the corresponding Datasource Id, 259 * or {@code null} if it does not exists 260 * 261 * @param dataSourceName the name of the datasource to look up 262 * @return the datasource id or {@code null} 263 */ 264 private String getDatasourceId( final String indexId, final String dataSourceName ) { 265 ListDataSourcesRequest request = new ListDataSourcesRequest().withIndexId( indexId ); 266 ListDataSourcesResult result = getKendra().listDataSources( request ); 267 String nextToken = ""; 268 while ( nextToken != null ) { 269 final List< DataSourceSummary > items = result.getSummaryItems(); 270 if ( items == null || items.isEmpty() ) { 271 return null; 272 } 273 274 for ( final DataSourceSummary item : items ) { 275 if ( StringUtils.equals( item.getName(), dataSourceName ) ) { 276 return item.getId(); 277 } 278 } 279 nextToken = result.getNextToken(); 280 request = new ListDataSourcesRequest().withNextToken( result.getNextToken() ); 281 result = getKendra().listDataSources( request ); 282 } 283 return null; 284 } 285 286 /* 287 * Converts a SCORE Confidence from Kendra to an "equivalent" integer score 288 */ 289 private int confidence2score( final String scoreConfidence ) { 290 switch ( ScoreConfidence.fromValue( scoreConfidence ) ) { 291 case VERY_HIGH: 292 return 100; 293 case HIGH: 294 return 75; 295 case MEDIUM: 296 return 50; 297 case LOW: 298 return 25; 299 default: 300 return 0; 301 } 302 } 303 304 /** 305 * This method re-index all the pages found in the Wiki. It is mainly used at 306 * startup. 307 * 308 * @throws IOException in case some page can not be read 309 */ 310 private void doFullReindex() throws IOException { 311 try { 312 final Collection< Page > pages = engine.getManager( PageManager.class ).getAllPages(); 313 if ( pages.isEmpty() ) { 314 return; 315 } 316 LOG.debug( format( "Indexing all %d pages. Please wait ...", pages.size() ) ); 317 final String executionId = startExecution(); 318 for ( final Page page : pages ) { 319 // Since I do not want to handle the size limit 320 // (https://docs.aws.amazon.com/goto/WebAPI/kendra-2019-02-03/BatchPutDocument) 321 // uploading documents one at a time 322 indexOnePage( page, executionId ); 323 } 324 } catch ( final ProviderException e ) { 325 LOG.error( e.getMessage() ); 326 throw new IOException( e ); 327 } finally { 328 stopExecution(); 329 } 330 } 331 332 /** 333 * This method re-index all pages marked as updated. It is used to periodically 334 * index pages that have been modified 335 */ 336 private void doPartialReindex() { 337 if ( updates.isEmpty() ) { 338 return; 339 } 340 LOG.debug( "Indexing updated pages. Please wait ..." ); 341 final String executionId = startExecution(); 342 synchronized ( updates ) { 343 try { 344 while ( updates.size() > 0 ) { 345 indexOnePage( updates.remove( 0 ), executionId ); 346 } 347 } finally { 348 stopExecution(); 349 } 350 } 351 } 352 353 /** 354 * Returns an ExecutiuonId that is required to keep track of the modifed 355 * documents 356 * 357 * @return The execution id 358 */ 359 private String startExecution() { 360 final StartDataSourceSyncJobRequest request = new StartDataSourceSyncJobRequest().withIndexId( indexId ) 361 .withId( dataSourceId ); 362 final StartDataSourceSyncJobResult result = getKendra().startDataSourceSyncJob( request ); 363 return result.getExecutionId(); 364 } 365 366 /** 367 * Stop the execution for the given index Id and DataSource Id. 368 */ 369 private void stopExecution() { 370 final StopDataSourceSyncJobRequest request = new StopDataSourceSyncJobRequest().withIndexId( indexId ).withId( dataSourceId ); 371 getKendra().stopDataSourceSyncJob( request ); 372 } 373 374 /** 375 * Index on single {@link Page} into the Kendra Index 376 * 377 * @param page the {@link Page} to index 378 * @param executionId The Execution Id 379 */ 380 private void indexOnePage( final Page page, final String executionId ) { 381 final String pageName = page.getName(); 382 try { 383 final Document document = newDocument( page, executionId ); 384 final BatchPutDocumentRequest request = new BatchPutDocumentRequest().withIndexId( indexId ) 385 .withDocuments( document ); 386 final BatchPutDocumentResult result = getKendra().batchPutDocument( request ); 387 if ( result.getFailedDocuments().size() == 0 ) { 388 LOG.info( format( "Successfully indexed Page '%s' as %s", page.getName(), document.getContentType() ) ); 389 } else { 390 for ( final BatchPutDocumentResponseFailedDocument failedDocument : result.getFailedDocuments() ) { 391 LOG.error( format( "Failed to index Page '%s': %s", failedDocument.getId(), failedDocument.getErrorMessage() ) ); 392 } 393 } 394 } catch ( final IOException e ) { 395 LOG.error( format( "Failed to index Page '%s': %s", pageName, e.getMessage() ) ); 396 } 397 } 398 399 400 /** 401 * Given a {@link Page}, returns the corresponding Kendra {@link Document}. 402 * 403 * @param page the {@link Page} to be indexed 404 * @param executionId an execution id to identify when the {@link Page} was 405 * indexed for the last time. 406 * @return a {@link Document} containing the searchable attributes. 407 * @throws IOException if the {@link Page}'s {@link Attachment} can not be read. 408 */ 409 private Document newDocument( final Page page, final String executionId ) throws IOException { 410 final String pageName = page.getName(); 411 final List< DocumentAttribute > attrs = new ArrayList<>(); 412 // These 2 are required as per 413 // https://docs.aws.amazon.com/kendra/latest/dg/data-source-custom.html#custom-required-attributes 414 attrs.add( newAttribute( "_data_source_id", dataSourceId ) ); 415 attrs.add( newAttribute( "_data_source_sync_job_execution_id", executionId ) ); 416 417 final String title = TextUtil.beautifyString( pageName ); 418 ByteBuffer blob; 419 ContentType contentType = ContentType.PLAIN_TEXT; 420 if ( page instanceof Attachment ) { 421 final Attachment attachment = ( Attachment ) page; 422 InputStream is = null; 423 try { 424 final String filename = attachment.getFileName(); 425 contentType = getContentType( filename ); 426 is = engine.getManager( AttachmentManager.class ).getAttachmentStream( attachment ); 427 blob = ByteBuffer.wrap( IOUtils.toByteArray( is ) ); 428 } catch ( final ProviderException e ) { 429 throw new IOException( e ); 430 } finally { 431 IOUtils.closeQuietly( is, null ); 432 } 433 // contentType should be set to its real value 434 } else { 435 final String text = engine.getManager( PageManager.class ).getPureText( page ); 436 blob = ByteBuffer.wrap( text.getBytes( StandardCharsets.UTF_8 ) ); 437 } 438 return new Document().withId( pageName ).withTitle( title ).withAttributes( attrs ).withBlob( blob ) 439 .withContentType( contentType ); 440 } 441 442 private DocumentAttribute newAttribute( final String key, final String value ) { 443 return new DocumentAttribute().withKey( key ).withValue( new DocumentAttributeValue().withStringValue( value ) ); 444 } 445 446 @SuppressWarnings( "unchecked" ) 447 private ContentType getContentType( final String filename ) { 448 final String extention = FilenameUtils.getExtension( filename ); 449 final Map< String, String > ct = ( Map< String, String > ) this.contentTypes.get( "ContentTypes" ); 450 return ContentType.fromValue( ct.getOrDefault( extention, ContentType.PLAIN_TEXT.name() ) ); 451 } 452 453 /** 454 * Updater thread that updates Kendra indexes. 455 */ 456 private static final class KendraUpdater extends WikiBackgroundThread { 457 static final int INDEX_DELAY = 5; 458 static final int INITIAL_DELAY = 10; 459 private final KendraSearchProvider provider; 460 461 private final int initialDelay; 462 463 private WatchDog watchdog; 464 465 private KendraUpdater( final Engine engine, final KendraSearchProvider provider, final int initialDelay, final int indexDelay ) { 466 super( engine, indexDelay ); 467 this.provider = provider; 468 this.initialDelay = initialDelay; 469 setName( "JSPWiki Kendra Indexer" ); 470 } 471 472 @Override 473 public void startupTask() throws Exception { 474 watchdog = WatchDog.getCurrentWatchDog( getEngine() ); 475 try { 476 Thread.sleep( initialDelay * 1000L ); 477 } catch ( final InterruptedException e ) { 478 throw new InternalWikiException( "Interrupted while waiting to start.", e ); 479 } 480 watchdog.enterState( "Full reindex" ); 481 provider.initializeIndexAndDataSource(); 482 provider.doFullReindex(); 483 watchdog.exitState(); 484 } 485 486 @Override 487 public void backgroundTask() { 488 watchdog.enterState( "Reindexing ...", 60 ); 489 provider.doPartialReindex(); 490 watchdog.exitState(); 491 } 492 } 493 494 private static class SearchResultImpl implements SearchResult { 495 496 private final Page page; 497 private final int score; 498 private final String[] contexts; 499 500 public SearchResultImpl( final Page page, final int score, final String[] contexts ) { 501 this.page = page; 502 this.score = score; 503 this.contexts = contexts != null ? contexts.clone() : null; 504 } 505 506 @Override 507 public Page getPage() { 508 return this.page; 509 } 510 511 @Override 512 public int getScore() { 513 return this.score; 514 } 515 516 @Override 517 public String[] getContexts() { 518 return this.contexts; 519 } 520 } 521 522 public AWSkendra getKendra() { 523 return kendra; 524 } 525 526 public void setKendra( final AWSkendra kendra ) { 527 this.kendra = kendra; 528 } 529 530 protected AWSkendra buildClient() { 531 return AWSkendraClientBuilder.defaultClient(); 532 } 533 534 public String getIndexName() { 535 return indexName; 536 } 537 538 public String getDataSourceName() { 539 return dataSourceName; 540 } 541 542}