001/*
002    Licensed to the Apache Software Foundation (ASF) under one
003    or more contributor license agreements.  See the NOTICE file
004    distributed with this work for additional information
005    regarding copyright ownership.  The ASF licenses this file
006    to you under the Apache License, Version 2.0 (the
007    "License"); you may not use this file except in compliance
008    with the License.  You may obtain a copy of the License at
009
010       http://www.apache.org/licenses/LICENSE-2.0
011
012    Unless required by applicable law or agreed to in writing,
013    software distributed under the License is distributed on an
014    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015    KIND, either express or implied.  See the License for the
016    specific language governing permissions and limitations
017    under the License.    
018 */
019package org.apache.wiki.search.kendra;
020
021import com.amazonaws.services.kendra.AWSkendra;
022import com.amazonaws.services.kendra.AWSkendraClientBuilder;
023import com.amazonaws.services.kendra.model.*;
024import com.amazonaws.util.IOUtils;
025import com.google.gson.Gson;
026import com.google.gson.GsonBuilder;
027import com.google.gson.reflect.TypeToken;
028import org.apache.commons.io.FilenameUtils;
029import org.apache.commons.lang3.StringUtils;
030import org.apache.logging.log4j.LogManager;
031import org.apache.logging.log4j.Logger;
032import org.apache.wiki.InternalWikiException;
033import org.apache.wiki.WatchDog;
034import org.apache.wiki.WikiBackgroundThread;
035import org.apache.wiki.api.core.Attachment;
036import org.apache.wiki.api.core.Context;
037import org.apache.wiki.api.core.Engine;
038import org.apache.wiki.api.core.Page;
039import org.apache.wiki.api.exceptions.NoRequiredPropertyException;
040import org.apache.wiki.api.exceptions.ProviderException;
041import org.apache.wiki.api.providers.PageProvider;
042import org.apache.wiki.api.search.SearchResult;
043import org.apache.wiki.api.spi.Wiki;
044import org.apache.wiki.attachment.AttachmentManager;
045import org.apache.wiki.auth.AuthorizationManager;
046import org.apache.wiki.auth.permissions.PagePermission;
047import org.apache.wiki.pages.PageManager;
048import org.apache.wiki.search.SearchProvider;
049import org.apache.wiki.util.TextUtil;
050
051import java.io.IOException;
052import java.io.InputStream;
053import java.io.InputStreamReader;
054import java.lang.reflect.Type;
055import java.nio.ByteBuffer;
056import java.nio.charset.StandardCharsets;
057import java.util.*;
058
059import static java.lang.String.format;
060
061/**
062 * Search provider that implements {link SearchProvider} using AWS Kendra for
063 * indexing. Note that we are using a Custom DataSource which limits the
064 * attributes that can be uploaded / searched for each page (as per
065 * https://docs.aws.amazon.com/kendra/latest/dg/custom-attributes.html). This
066 * could be overcome by using an S3 bucket where any custom attributes can be
067 * added.
068 *
069 * @since 2.11.0
070 */
071public class KendraSearchProvider implements SearchProvider {
072
073    private static final Logger LOG = LogManager.getLogger( KendraSearchProvider.class );
074    private Engine engine;
075    private Properties properties;
076    private Map< String, Object > contentTypes;
077    private AWSkendra kendra;
078    private String indexName;
079    private String indexId;
080    private String dataSourceName;
081    private String dataSourceId;
082
083    private final List< Page > updates = Collections.synchronizedList( new ArrayList<>() );
084
085    private static final String PROP_KENDRA_INDEX_NAME = "jspwiki.kendra.indexName";
086    private static final String PROP_KENDRA_DATA_SOURCE_NAME = "jspwiki.kendra.dataSourceName";
087    private static final String PROP_KENDRA_INDEXDELAY = "jspwiki.kendra.indexdelay";
088    private static final String PROP_KENDRA_INITIALDELAY = "jspwiki.kendra.initialdelay";
089
090    public KendraSearchProvider() {
091    }
092
093    /**
094     * {@inheritDoc}
095     */
096    @Override
097    public void initialize( final Engine engine, final Properties properties ) throws NoRequiredPropertyException, IOException {
098        this.engine = engine;
099        this.properties = properties;
100        this.contentTypes = getContentTypes();
101
102        setKendra( buildClient() );
103
104        this.indexName = TextUtil.getRequiredProperty( this.properties, PROP_KENDRA_INDEX_NAME );
105        this.dataSourceName = TextUtil.getRequiredProperty( this.properties, PROP_KENDRA_DATA_SOURCE_NAME );
106        final int initialDelay = TextUtil.getIntegerProperty( this.properties, PROP_KENDRA_INITIALDELAY, KendraUpdater.INITIAL_DELAY );
107        final int indexDelay = TextUtil.getIntegerProperty( this.properties, PROP_KENDRA_INDEXDELAY, KendraUpdater.INDEX_DELAY );
108
109        // Start the Kendra update thread, which waits first for a little while
110        // before starting to go through the "pages that need updating".
111        if ( initialDelay >= 0 ) {
112            final KendraUpdater updater = new KendraUpdater( engine, this, initialDelay, indexDelay );
113            updater.start();
114        }
115    }
116
117    private Map< String, Object > getContentTypes() {
118        final Gson gson = new GsonBuilder().create();
119        try ( final InputStream in = KendraSearchProvider.class.getResourceAsStream( "content_types.json" ) ) {
120            if ( in != null ) {
121                final Type collectionType = new TypeToken< HashMap< String, Object > >() {
122                }.getType();
123                return gson.fromJson( new InputStreamReader( in ), collectionType );
124            }
125        } catch ( final IOException e ) {
126            LOG.error( format( "Unable to load default propertyfile 'content_types.json': %s", e.getMessage() ), e );
127        }
128        return null;
129    }
130
131    /**
132     * {@inheritDoc}
133     */
134    @Override
135    public String getProviderInfo() {
136        return "KendraSearchProvider";
137    }
138
139    /**
140     * {@inheritDoc}
141     */
142    @Override
143    public void pageRemoved( final Page page ) {
144        final String pageName = page.getName();
145        final BatchDeleteDocumentRequest request = new BatchDeleteDocumentRequest().withIndexId( indexId )
146                .withDocumentIdList( pageName );
147        final BatchDeleteDocumentResult result = getKendra().batchDeleteDocument( request );
148        if ( result.getFailedDocuments().size() == 0 ) {
149            LOG.debug( format( "Page '%s' was removed from index", pageName ) );
150        } else {
151            LOG.error( format( "Failed to remove Page '%s' from index", pageName ) );
152        }
153    }
154
155    /**
156     * {@inheritDoc}
157     */
158    @Override
159    public void reindexPage( final Page page ) {
160        if ( page != null ) {
161            updates.add( page );
162            LOG.debug( format( "Scheduling page '%s' for indexing ...", page.getName() ) );
163        }
164    }
165
166    /**
167     * {@inheritDoc}
168     */
169    @Override
170    public Collection< SearchResult > findPages( final String query, final Context wikiContext ) throws ProviderException, IOException {
171        final QueryRequest request = new QueryRequest().withIndexId( indexId ).withQueryText( query );
172        final List< QueryResultItem > items;
173        try {
174            items = getKendra().query( request ).getResultItems();
175        } catch ( final ThrottlingException e ) {
176            LOG.error( "ThrottlingException. Skipping..." );
177            return new ArrayList<>();
178        }
179        final List< SearchResult > searchResults = new ArrayList<>( items.size() );
180        final AuthorizationManager mgr = engine.getManager( AuthorizationManager.class );
181
182        for ( final QueryResultItem item : items ) {
183            switch( QueryResultType.fromValue( item.getType() ) ) {
184                case DOCUMENT:
185                    final String documentId = item.getDocumentId();
186                    final String documentExcerpt = item.getDocumentExcerpt().getText();
187                    final String scoreConfidence = item.getScoreAttributes().getScoreConfidence();
188                    final Page page = this.engine.getManager( PageManager.class ).getPage( documentId, PageProvider.LATEST_VERSION );
189                    if ( page != null ) {
190                        final PagePermission pp = new PagePermission( page, PagePermission.VIEW_ACTION );
191                        if ( mgr.checkPermission( wikiContext.getWikiSession(), pp ) ) {
192                            final SearchResult searchResult = new SearchResultImpl( page, confidence2score( scoreConfidence ),
193                                    new String[]{ documentExcerpt } );
194                            searchResults.add( searchResult );
195                        } else {
196                            LOG.error( format( "Page '%s' is not accessible", documentId ) );
197                        }
198                    } else {
199                        LOG.error(
200                                format( "Kendra found a result page '%s' that could not be loaded, removing from index", documentId ) );
201                        pageRemoved( Wiki.contents().page( this.engine, documentId ) );
202                    }
203                    break;
204                default:
205                    LOG.error( format( "Unknown query result type: %s", item.getType() ) );
206            }
207        }
208        return searchResults;
209    }
210
211    /**
212     * This method initialize the AWS Kendra Index and Datasources to be used.
213     */
214    public void initializeIndexAndDataSource() {
215        this.indexId = getIndexId( indexName );
216        if ( this.indexId == null ) {
217            final String message = format( "Index '%s' does not exist", indexName );
218            LOG.error( message );
219            throw new IllegalArgumentException( message );
220        }
221        this.dataSourceId = getDatasourceId( this.indexId, dataSourceName );
222        if ( this.dataSourceId == null ) {
223            final String message = format( "Datasource '%s' does not exist in index %s", dataSourceName, indexName );
224            LOG.error( message );
225            throw new IllegalArgumentException( message );
226        }
227    }
228
229    /**
230     * Given an Kendra's Index name, returns the corresponding Index Id, or
231     * {@code null} if it does not exists
232     *
233     * @param indexName the name of the index to look up
234     * @return the index id or {@code null}
235     */
236    private String getIndexId( final String indexName ) {
237        ListIndicesRequest request = new ListIndicesRequest();
238        ListIndicesResult result = getKendra().listIndices( request );
239        String nextToken = "";
240        while ( nextToken != null ) {
241            final List< IndexConfigurationSummary > items = result.getIndexConfigurationSummaryItems();
242            if ( items == null || items.isEmpty() ) {
243                return null;
244            }
245            for ( final IndexConfigurationSummary item : items ) {
246                if ( StringUtils.equals( item.getName(), indexName ) ) {
247                    return item.getId();
248                }
249            }
250            nextToken = result.getNextToken();
251            request = new ListIndicesRequest().withNextToken( result.getNextToken() );
252            result = getKendra().listIndices( request );
253        }
254        return null;
255    }
256
257    /**
258     * Given an Kendra's Datasource name, returns the corresponding Datasource Id,
259     * or {@code null} if it does not exists
260     *
261     * @param dataSourceName the name of the datasource to look up
262     * @return the datasource id or {@code null}
263     */
264    private String getDatasourceId( final String indexId, final String dataSourceName ) {
265        ListDataSourcesRequest request = new ListDataSourcesRequest().withIndexId( indexId );
266        ListDataSourcesResult result = getKendra().listDataSources( request );
267        String nextToken = "";
268        while ( nextToken != null ) {
269            final List< DataSourceSummary > items = result.getSummaryItems();
270            if ( items == null || items.isEmpty() ) {
271                return null;
272            }
273
274            for ( final DataSourceSummary item : items ) {
275                if ( StringUtils.equals( item.getName(), dataSourceName ) ) {
276                    return item.getId();
277                }
278            }
279            nextToken = result.getNextToken();
280            request = new ListDataSourcesRequest().withNextToken( result.getNextToken() );
281            result = getKendra().listDataSources( request );
282        }
283        return null;
284    }
285
286    /*
287     * Converts a SCORE Confidence from Kendra to an "equivalent" integer score
288     */
289    private int confidence2score( final String scoreConfidence ) {
290        switch ( ScoreConfidence.fromValue( scoreConfidence ) ) {
291            case VERY_HIGH:
292                return 100;
293            case HIGH:
294                return 75;
295            case MEDIUM:
296                return 50;
297            case LOW:
298                return 25;
299            default:
300                return 0;
301        }
302    }
303
304    /**
305     * This method re-index all the pages found in the Wiki. It is mainly used at
306     * startup.
307     *
308     * @throws IOException in case some page can not be read
309     */
310    private void doFullReindex() throws IOException {
311        try {
312            final Collection< Page > pages = engine.getManager( PageManager.class ).getAllPages();
313            if ( pages.isEmpty() ) {
314                return;
315            }
316            LOG.debug( format( "Indexing all %d pages. Please wait ...", pages.size() ) );
317            final String executionId = startExecution();
318            for ( final Page page : pages ) {
319                // Since I do not want to handle the size limit
320                // (https://docs.aws.amazon.com/goto/WebAPI/kendra-2019-02-03/BatchPutDocument)
321                // uploading documents one at a time
322                indexOnePage( page, executionId );
323            }
324        } catch ( final ProviderException e ) {
325            LOG.error( e.getMessage() );
326            throw new IOException( e );
327        } finally {
328            stopExecution();
329        }
330    }
331
332    /**
333     * This method re-index all pages marked as updated. It is used to periodically
334     * index pages that have been modified
335     */
336    private void doPartialReindex() {
337        if ( updates.isEmpty() ) {
338            return;
339        }
340        LOG.debug( "Indexing updated pages. Please wait ..." );
341        final String executionId = startExecution();
342        synchronized ( updates ) {
343            try {
344                while ( updates.size() > 0 ) {
345                    indexOnePage( updates.remove( 0 ), executionId );
346                }
347            } finally {
348                stopExecution();
349            }
350        }
351    }
352
353    /**
354     * Returns an ExecutiuonId that is required to keep track of the modifed
355     * documents
356     *
357     * @return The execution id
358     */
359    private String startExecution() {
360        final StartDataSourceSyncJobRequest request = new StartDataSourceSyncJobRequest().withIndexId( indexId )
361                .withId( dataSourceId );
362        final StartDataSourceSyncJobResult result = getKendra().startDataSourceSyncJob( request );
363        return result.getExecutionId();
364    }
365
366    /**
367     * Stop the execution for the given index Id and DataSource Id.
368     */
369    private void stopExecution() {
370        final StopDataSourceSyncJobRequest request = new StopDataSourceSyncJobRequest().withIndexId( indexId ).withId( dataSourceId );
371        getKendra().stopDataSourceSyncJob( request );
372    }
373
374    /**
375     * Index on single {@link Page} into the Kendra Index
376     *
377     * @param page        the {@link Page} to index
378     * @param executionId The Execution Id
379     */
380    private void indexOnePage( final Page page, final String executionId ) {
381        final String pageName = page.getName();
382        try {
383            final Document document = newDocument( page, executionId );
384            final BatchPutDocumentRequest request = new BatchPutDocumentRequest().withIndexId( indexId )
385                    .withDocuments( document );
386            final BatchPutDocumentResult result = getKendra().batchPutDocument( request );
387            if ( result.getFailedDocuments().size() == 0 ) {
388                LOG.info( format( "Successfully indexed Page '%s' as %s", page.getName(), document.getContentType() ) );
389            } else {
390                for ( final BatchPutDocumentResponseFailedDocument failedDocument : result.getFailedDocuments() ) {
391                    LOG.error( format( "Failed to index Page '%s': %s", failedDocument.getId(), failedDocument.getErrorMessage() ) );
392                }
393            }
394        } catch ( final IOException e ) {
395            LOG.error( format( "Failed to index Page '%s': %s", pageName, e.getMessage() ) );
396        }
397    }
398
399
400    /**
401     * Given a {@link Page}, returns the corresponding Kendra {@link Document}.
402     *
403     * @param page        the {@link Page} to be indexed
404     * @param executionId an execution id to identify when the {@link Page} was
405     *                    indexed for the last time.
406     * @return a {@link Document} containing the searchable attributes.
407     * @throws IOException if the {@link Page}'s {@link Attachment} can not be read.
408     */
409    private Document newDocument( final Page page, final String executionId ) throws IOException {
410        final String pageName = page.getName();
411        final List< DocumentAttribute > attrs = new ArrayList<>();
412        // These 2 are required as per
413        // https://docs.aws.amazon.com/kendra/latest/dg/data-source-custom.html#custom-required-attributes
414        attrs.add( newAttribute( "_data_source_id", dataSourceId ) );
415        attrs.add( newAttribute( "_data_source_sync_job_execution_id", executionId ) );
416
417        final String title = TextUtil.beautifyString( pageName );
418        ByteBuffer blob;
419        ContentType contentType = ContentType.PLAIN_TEXT;
420        if ( page instanceof Attachment ) {
421            final Attachment attachment = ( Attachment ) page;
422            InputStream is = null;
423            try {
424                final String filename = attachment.getFileName();
425                contentType = getContentType( filename );
426                is = engine.getManager( AttachmentManager.class ).getAttachmentStream( attachment );
427                blob = ByteBuffer.wrap( IOUtils.toByteArray( is ) );
428            } catch ( final ProviderException e ) {
429                throw new IOException( e );
430            } finally {
431                IOUtils.closeQuietly( is, null );
432            }
433            // contentType should be set to its real value
434        } else {
435            final String text = engine.getManager( PageManager.class ).getPureText( page );
436            blob = ByteBuffer.wrap( text.getBytes( StandardCharsets.UTF_8 ) );
437        }
438        return new Document().withId( pageName ).withTitle( title ).withAttributes( attrs ).withBlob( blob )
439                .withContentType( contentType );
440    }
441
442    private DocumentAttribute newAttribute( final String key, final String value ) {
443        return new DocumentAttribute().withKey( key ).withValue( new DocumentAttributeValue().withStringValue( value ) );
444    }
445
446    @SuppressWarnings( "unchecked" )
447    private ContentType getContentType( final String filename ) {
448        final String extention = FilenameUtils.getExtension( filename );
449        final Map< String, String > ct = ( Map< String, String > ) this.contentTypes.get( "ContentTypes" );
450        return ContentType.fromValue( ct.getOrDefault( extention, ContentType.PLAIN_TEXT.name() ) );
451    }
452
453    /**
454     * Updater thread that updates Kendra indexes.
455     */
456    private static final class KendraUpdater extends WikiBackgroundThread {
457        static final int INDEX_DELAY = 5;
458        static final int INITIAL_DELAY = 10;
459        private final KendraSearchProvider provider;
460
461        private final int initialDelay;
462
463        private WatchDog watchdog;
464
465        private KendraUpdater( final Engine engine, final KendraSearchProvider provider, final int initialDelay, final int indexDelay ) {
466            super( engine, indexDelay );
467            this.provider = provider;
468            this.initialDelay = initialDelay;
469            setName( "JSPWiki Kendra Indexer" );
470        }
471
472        @Override
473        public void startupTask() throws Exception {
474            watchdog = WatchDog.getCurrentWatchDog( getEngine() );
475            try {
476                Thread.sleep( initialDelay * 1000L );
477            } catch ( final InterruptedException e ) {
478                throw new InternalWikiException( "Interrupted while waiting to start.", e );
479            }
480            watchdog.enterState( "Full reindex" );
481            provider.initializeIndexAndDataSource();
482            provider.doFullReindex();
483            watchdog.exitState();
484        }
485
486        @Override
487        public void backgroundTask() {
488            watchdog.enterState( "Reindexing ...", 60 );
489            provider.doPartialReindex();
490            watchdog.exitState();
491        }
492    }
493
494    private static class SearchResultImpl implements SearchResult {
495
496        private final Page page;
497        private final int score;
498        private final String[] contexts;
499
500        public SearchResultImpl( final Page page, final int score, final String[] contexts ) {
501            this.page = page;
502            this.score = score;
503            this.contexts = contexts != null ? contexts.clone() : null;
504        }
505
506        @Override
507        public Page getPage() {
508            return this.page;
509        }
510
511        @Override
512        public int getScore() {
513            return this.score;
514        }
515
516        @Override
517        public String[] getContexts() {
518            return this.contexts;
519        }
520    }
521
522    public AWSkendra getKendra() {
523        return kendra;
524    }
525
526    public void setKendra( final AWSkendra kendra ) {
527        this.kendra = kendra;
528    }
529
530    protected AWSkendra buildClient() {
531        return AWSkendraClientBuilder.defaultClient();
532    }
533
534    public String getIndexName() {
535        return indexName;
536    }
537
538    public String getDataSourceName() {
539        return dataSourceName;
540    }
541
542}