2020-08-30 19:08:01 +03:00
// Copyright 2020 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package code
import (
2021-03-04 05:57:01 +03:00
"bufio"
2020-08-30 19:08:01 +03:00
"context"
2022-01-27 11:30:51 +03:00
"errors"
2020-08-30 19:08:01 +03:00
"fmt"
2021-03-04 05:57:01 +03:00
"io"
2022-01-27 11:30:51 +03:00
"net"
2020-08-30 19:08:01 +03:00
"strconv"
"strings"
2022-01-27 11:30:51 +03:00
"sync"
2020-08-30 19:08:01 +03:00
"time"
2021-12-10 04:27:50 +03:00
repo_model "code.gitea.io/gitea/models/repo"
2020-08-30 19:08:01 +03:00
"code.gitea.io/gitea/modules/analyze"
"code.gitea.io/gitea/modules/charset"
"code.gitea.io/gitea/modules/git"
2022-01-27 11:30:51 +03:00
"code.gitea.io/gitea/modules/graceful"
2021-07-24 19:03:58 +03:00
"code.gitea.io/gitea/modules/json"
2020-08-30 19:08:01 +03:00
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
2021-06-05 15:32:19 +03:00
"code.gitea.io/gitea/modules/typesniffer"
2020-08-30 19:08:01 +03:00
"github.com/go-enry/go-enry/v2"
"github.com/olivere/elastic/v7"
)
const (
esRepoIndexerLatestVersion = 1
2021-01-27 13:00:35 +03:00
// multi-match-types, currently only 2 types are used
// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/7.0/query-dsl-multi-match-query.html#multi-match-types
esMultiMatchTypeBestFields = "best_fields"
esMultiMatchTypePhrasePrefix = "phrase_prefix"
2020-08-30 19:08:01 +03:00
)
2022-01-20 20:46:10 +03:00
var _ Indexer = & ElasticSearchIndexer { }
2020-08-30 19:08:01 +03:00
// ElasticSearchIndexer implements Indexer interface
type ElasticSearchIndexer struct {
2022-01-27 11:30:51 +03:00
client * elastic . Client
indexerAliasName string
available bool
availabilityCallback func ( bool )
stopTimer chan struct { }
lock sync . RWMutex
2020-08-30 19:08:01 +03:00
}
type elasticLogger struct {
2020-10-31 08:36:46 +03:00
log . Logger
2020-08-30 19:08:01 +03:00
}
func ( l elasticLogger ) Printf ( format string , args ... interface { } ) {
_ = l . Logger . Log ( 2 , l . Logger . GetLevel ( ) , format , args ... )
}
// NewElasticSearchIndexer creates a new elasticsearch indexer
func NewElasticSearchIndexer ( url , indexerName string ) ( * ElasticSearchIndexer , bool , error ) {
opts := [ ] elastic . ClientOptionFunc {
elastic . SetURL ( url ) ,
elastic . SetSniff ( false ) ,
elastic . SetHealthcheckInterval ( 10 * time . Second ) ,
elastic . SetGzip ( false ) ,
}
logger := elasticLogger { log . GetLogger ( log . DEFAULT ) }
if logger . GetLevel ( ) == log . TRACE || logger . GetLevel ( ) == log . DEBUG {
opts = append ( opts , elastic . SetTraceLog ( logger ) )
} else if logger . GetLevel ( ) == log . ERROR || logger . GetLevel ( ) == log . CRITICAL || logger . GetLevel ( ) == log . FATAL {
opts = append ( opts , elastic . SetErrorLog ( logger ) )
} else if logger . GetLevel ( ) == log . INFO || logger . GetLevel ( ) == log . WARN {
opts = append ( opts , elastic . SetInfoLog ( logger ) )
}
client , err := elastic . NewClient ( opts ... )
if err != nil {
return nil , false , err
}
indexer := & ElasticSearchIndexer {
client : client ,
indexerAliasName : indexerName ,
2022-01-27 11:30:51 +03:00
available : true ,
stopTimer : make ( chan struct { } ) ,
2020-08-30 19:08:01 +03:00
}
2022-01-27 11:30:51 +03:00
ticker := time . NewTicker ( 10 * time . Second )
go func ( ) {
for {
select {
case <- ticker . C :
indexer . checkAvailability ( )
case <- indexer . stopTimer :
ticker . Stop ( )
return
}
}
} ( )
2020-08-30 19:08:01 +03:00
exists , err := indexer . init ( )
2021-11-15 16:16:11 +03:00
if err != nil {
indexer . Close ( )
return nil , false , err
}
2020-08-30 19:08:01 +03:00
return indexer , ! exists , err
}
const (
defaultMapping = ` {
"mappings" : {
"properties" : {
"repo_id" : {
"type" : "long" ,
"index" : true
} ,
"content" : {
"type" : "text" ,
2020-09-12 15:31:52 +03:00
"term_vector" : "with_positions_offsets" ,
2020-08-30 19:08:01 +03:00
"index" : true
} ,
"commit_id" : {
"type" : "keyword" ,
"index" : true
} ,
"language" : {
"type" : "keyword" ,
"index" : true
} ,
"updated_at" : {
"type" : "long" ,
"index" : true
}
}
}
} `
)
func ( b * ElasticSearchIndexer ) realIndexerName ( ) string {
return fmt . Sprintf ( "%s.v%d" , b . indexerAliasName , esRepoIndexerLatestVersion )
}
// Init will initialize the indexer
func ( b * ElasticSearchIndexer ) init ( ) ( bool , error ) {
2022-01-27 11:30:51 +03:00
ctx := graceful . GetManager ( ) . HammerContext ( )
2020-08-30 19:08:01 +03:00
exists , err := b . client . IndexExists ( b . realIndexerName ( ) ) . Do ( ctx )
if err != nil {
2022-01-27 11:30:51 +03:00
return false , b . checkError ( err )
2020-08-30 19:08:01 +03:00
}
if ! exists {
2022-01-20 20:46:10 +03:00
mapping := defaultMapping
2020-08-30 19:08:01 +03:00
createIndex , err := b . client . CreateIndex ( b . realIndexerName ( ) ) . BodyString ( mapping ) . Do ( ctx )
if err != nil {
2022-01-27 11:30:51 +03:00
return false , b . checkError ( err )
2020-08-30 19:08:01 +03:00
}
if ! createIndex . Acknowledged {
return false , fmt . Errorf ( "create index %s with %s failed" , b . realIndexerName ( ) , mapping )
}
}
// check version
r , err := b . client . Aliases ( ) . Do ( ctx )
if err != nil {
2022-01-27 11:30:51 +03:00
return false , b . checkError ( err )
2020-08-30 19:08:01 +03:00
}
realIndexerNames := r . IndicesByAlias ( b . indexerAliasName )
if len ( realIndexerNames ) < 1 {
res , err := b . client . Alias ( ) .
Add ( b . realIndexerName ( ) , b . indexerAliasName ) .
Do ( ctx )
if err != nil {
2022-01-27 11:30:51 +03:00
return false , b . checkError ( err )
2020-08-30 19:08:01 +03:00
}
if ! res . Acknowledged {
2022-01-27 11:30:51 +03:00
return false , fmt . Errorf ( "create alias %s to index %s failed" , b . indexerAliasName , b . realIndexerName ( ) )
2020-08-30 19:08:01 +03:00
}
} else if len ( realIndexerNames ) >= 1 && realIndexerNames [ 0 ] < b . realIndexerName ( ) {
log . Warn ( "Found older gitea indexer named %s, but we will create a new one %s and keep the old NOT DELETED. You can delete the old version after the upgrade succeed." ,
realIndexerNames [ 0 ] , b . realIndexerName ( ) )
res , err := b . client . Alias ( ) .
Remove ( realIndexerNames [ 0 ] , b . indexerAliasName ) .
Add ( b . realIndexerName ( ) , b . indexerAliasName ) .
Do ( ctx )
if err != nil {
2022-01-27 11:30:51 +03:00
return false , b . checkError ( err )
2020-08-30 19:08:01 +03:00
}
if ! res . Acknowledged {
2022-01-27 11:30:51 +03:00
return false , fmt . Errorf ( "change alias %s to index %s failed" , b . indexerAliasName , b . realIndexerName ( ) )
2020-08-30 19:08:01 +03:00
}
}
return exists , nil
}
2022-01-27 11:30:51 +03:00
// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
func ( b * ElasticSearchIndexer ) SetAvailabilityChangeCallback ( callback func ( bool ) ) {
b . lock . Lock ( )
defer b . lock . Unlock ( )
b . availabilityCallback = callback
}
// Ping checks if elastic is available
func ( b * ElasticSearchIndexer ) Ping ( ) bool {
b . lock . RLock ( )
defer b . lock . RUnlock ( )
return b . available
}
2022-01-20 02:26:57 +03:00
func ( b * ElasticSearchIndexer ) addUpdate ( ctx context . Context , batchWriter git . WriteCloserError , batchReader * bufio . Reader , sha string , update fileUpdate , repo * repo_model . Repository ) ( [ ] elastic . BulkableRequest , error ) {
2020-09-07 18:05:08 +03:00
// Ignore vendored files in code search
2021-04-01 20:41:09 +03:00
if setting . Indexer . ExcludeVendored && analyze . IsVendor ( update . Filename ) {
2020-09-07 18:05:08 +03:00
return nil , nil
}
2021-02-18 00:32:25 +03:00
size := update . Size
2022-04-01 05:55:30 +03:00
var err error
2021-02-18 00:32:25 +03:00
if ! update . Sized {
2022-04-01 05:55:30 +03:00
var stdout string
2022-10-23 17:44:45 +03:00
stdout , _ , err = git . NewCommand ( ctx , "cat-file" , "-s" ) . AddDynamicArguments ( update . BlobSha ) . RunStdString ( & git . RunOpts { Dir : repo . RepoPath ( ) } )
2021-02-18 00:32:25 +03:00
if err != nil {
return nil , err
}
if size , err = strconv . ParseInt ( strings . TrimSpace ( stdout ) , 10 , 64 ) ; err != nil {
2022-01-27 11:30:51 +03:00
return nil , fmt . Errorf ( "misformatted git cat-file output: %v" , err )
2021-02-18 00:32:25 +03:00
}
2020-08-30 19:08:01 +03:00
}
2021-02-18 00:32:25 +03:00
if size > setting . Indexer . MaxIndexerFileSize {
2020-08-30 19:08:01 +03:00
return [ ] elastic . BulkableRequest { b . addDelete ( update . Filename , repo ) } , nil
}
2021-03-04 05:57:01 +03:00
if _ , err := batchWriter . Write ( [ ] byte ( update . BlobSha + "\n" ) ) ; err != nil {
return nil , err
}
2022-04-01 05:55:30 +03:00
_ , _ , size , err = git . ReadBatchLine ( batchReader )
2021-03-04 05:57:01 +03:00
if err != nil {
return nil , err
}
2021-09-22 08:38:34 +03:00
fileContents , err := io . ReadAll ( io . LimitReader ( batchReader , size ) )
2020-08-30 19:08:01 +03:00
if err != nil {
return nil , err
2021-06-05 15:32:19 +03:00
} else if ! typesniffer . DetectContentType ( fileContents ) . IsText ( ) {
2020-08-30 19:08:01 +03:00
// FIXME: UTF-16 files will probably fail here
return nil , nil
}
2021-06-21 01:00:46 +03:00
if _ , err = batchReader . Discard ( 1 ) ; err != nil {
return nil , err
}
2020-08-30 19:08:01 +03:00
id := filenameIndexerID ( repo . ID , update . Filename )
return [ ] elastic . BulkableRequest {
elastic . NewBulkIndexRequest ( ) .
Index ( b . indexerAliasName ) .
Id ( id ) .
Doc ( map [ string ] interface { } {
"repo_id" : repo . ID ,
"content" : string ( charset . ToUTF8DropErrors ( fileContents ) ) ,
"commit_id" : sha ,
"language" : analyze . GetCodeLanguage ( update . Filename , fileContents ) ,
"updated_at" : timeutil . TimeStampNow ( ) ,
} ) ,
} , nil
}
2021-12-10 04:27:50 +03:00
func ( b * ElasticSearchIndexer ) addDelete ( filename string , repo * repo_model . Repository ) elastic . BulkableRequest {
2020-08-30 19:08:01 +03:00
id := filenameIndexerID ( repo . ID , filename )
return elastic . NewBulkDeleteRequest ( ) .
Index ( b . indexerAliasName ) .
Id ( id )
}
// Index will save the index data
2022-01-20 02:26:57 +03:00
func ( b * ElasticSearchIndexer ) Index ( ctx context . Context , repo * repo_model . Repository , sha string , changes * repoChanges ) error {
2020-08-30 19:08:01 +03:00
reqs := make ( [ ] elastic . BulkableRequest , 0 )
2021-03-04 05:57:01 +03:00
if len ( changes . Updates ) > 0 {
2021-12-16 22:01:14 +03:00
// Now because of some insanity with git cat-file not immediately failing if not run in a valid git directory we need to run git rev-parse first!
2022-07-01 02:48:25 +03:00
if err := git . EnsureValidGitRepository ( ctx , repo . RepoPath ( ) ) ; err != nil {
2021-12-16 22:01:14 +03:00
log . Error ( "Unable to open git repo: %s for %-v: %v" , repo . RepoPath ( ) , repo , err )
return err
}
2021-03-04 05:57:01 +03:00
2022-01-20 02:26:57 +03:00
batchWriter , batchReader , cancel := git . CatFileBatch ( ctx , repo . RepoPath ( ) )
2021-03-04 05:57:01 +03:00
defer cancel ( )
for _ , update := range changes . Updates {
2022-01-20 02:26:57 +03:00
updateReqs , err := b . addUpdate ( ctx , batchWriter , batchReader , sha , update , repo )
2021-03-04 05:57:01 +03:00
if err != nil {
return err
}
if len ( updateReqs ) > 0 {
reqs = append ( reqs , updateReqs ... )
}
2020-08-30 19:08:01 +03:00
}
2021-03-04 05:57:01 +03:00
cancel ( )
2020-08-30 19:08:01 +03:00
}
for _ , filename := range changes . RemovedFilenames {
reqs = append ( reqs , b . addDelete ( filename , repo ) )
}
if len ( reqs ) > 0 {
_ , err := b . client . Bulk ( ) .
Index ( b . indexerAliasName ) .
Add ( reqs ... ) .
2022-01-27 11:30:51 +03:00
Do ( ctx )
return b . checkError ( err )
2020-08-30 19:08:01 +03:00
}
return nil
}
// Delete deletes indexes by ids
func ( b * ElasticSearchIndexer ) Delete ( repoID int64 ) error {
_ , err := b . client . DeleteByQuery ( b . indexerAliasName ) .
Query ( elastic . NewTermsQuery ( "repo_id" , repoID ) ) .
2022-01-27 11:30:51 +03:00
Do ( graceful . GetManager ( ) . HammerContext ( ) )
return b . checkError ( err )
2020-08-30 19:08:01 +03:00
}
2020-09-12 15:31:52 +03:00
// indexPos find words positions for start and the following end on content. It will
2021-07-08 14:38:13 +03:00
// return the beginning position of the first start and the ending position of the
2020-09-12 15:31:52 +03:00
// first end following the start string.
// If not found any of the positions, it will return -1, -1.
func indexPos ( content , start , end string ) ( int , int ) {
startIdx := strings . Index ( content , start )
if startIdx < 0 {
return - 1 , - 1
}
endIdx := strings . Index ( content [ startIdx + len ( start ) : ] , end )
if endIdx < 0 {
return - 1 , - 1
}
return startIdx , startIdx + len ( start ) + endIdx + len ( end )
}
2020-08-30 19:08:01 +03:00
func convertResult ( searchResult * elastic . SearchResult , kw string , pageSize int ) ( int64 , [ ] * SearchResult , [ ] * SearchResultLanguages , error ) {
hits := make ( [ ] * SearchResult , 0 , pageSize )
for _ , hit := range searchResult . Hits . Hits {
// FIXME: There is no way to get the position the keyword on the content currently on the same request.
// So we get it from content, this may made the query slower. See
// https://discuss.elastic.co/t/fetching-position-of-keyword-in-matched-document/94291
2022-06-20 13:02:49 +03:00
var startIndex , endIndex int
2020-08-30 19:08:01 +03:00
c , ok := hit . Highlight [ "content" ]
if ok && len ( c ) > 0 {
2021-07-08 14:38:13 +03:00
// FIXME: Since the highlighting content will include <em> and </em> for the keywords,
// now we should find the positions. But how to avoid html content which contains the
2020-09-12 15:31:52 +03:00
// <em> and </em> tags? If elastic search has handled that?
startIndex , endIndex = indexPos ( c [ 0 ] , "<em>" , "</em>" )
if startIndex == - 1 {
panic ( fmt . Sprintf ( "1===%s,,,%#v,,,%s" , kw , hit . Highlight , c [ 0 ] ) )
2020-08-30 19:08:01 +03:00
}
} else {
panic ( fmt . Sprintf ( "2===%#v" , hit . Highlight ) )
}
repoID , fileName := parseIndexerID ( hit . Id )
2022-01-20 20:46:10 +03:00
res := make ( map [ string ] interface { } )
2020-08-30 19:08:01 +03:00
if err := json . Unmarshal ( hit . Source , & res ) ; err != nil {
return 0 , nil , nil , err
}
language := res [ "language" ] . ( string )
hits = append ( hits , & SearchResult {
RepoID : repoID ,
Filename : fileName ,
CommitID : res [ "commit_id" ] . ( string ) ,
Content : res [ "content" ] . ( string ) ,
UpdatedUnix : timeutil . TimeStamp ( res [ "updated_at" ] . ( float64 ) ) ,
Language : language ,
StartIndex : startIndex ,
2020-09-12 15:31:52 +03:00
EndIndex : endIndex - 9 , // remove the length <em></em> since we give Content the original data
2020-08-30 19:08:01 +03:00
Color : enry . GetColor ( language ) ,
} )
}
return searchResult . TotalHits ( ) , hits , extractAggs ( searchResult ) , nil
}
func extractAggs ( searchResult * elastic . SearchResult ) [ ] * SearchResultLanguages {
var searchResultLanguages [ ] * SearchResultLanguages
agg , found := searchResult . Aggregations . Terms ( "language" )
if found {
searchResultLanguages = make ( [ ] * SearchResultLanguages , 0 , 10 )
for _ , bucket := range agg . Buckets {
searchResultLanguages = append ( searchResultLanguages , & SearchResultLanguages {
Language : bucket . Key . ( string ) ,
Color : enry . GetColor ( bucket . Key . ( string ) ) ,
Count : int ( bucket . DocCount ) ,
} )
}
}
return searchResultLanguages
}
// Search searches for codes and language stats by given conditions.
2022-01-27 11:30:51 +03:00
func ( b * ElasticSearchIndexer ) Search ( ctx context . Context , repoIDs [ ] int64 , language , keyword string , page , pageSize int , isMatch bool ) ( int64 , [ ] * SearchResult , [ ] * SearchResultLanguages , error ) {
2021-01-27 13:00:35 +03:00
searchType := esMultiMatchTypeBestFields
if isMatch {
searchType = esMultiMatchTypePhrasePrefix
}
kwQuery := elastic . NewMultiMatchQuery ( keyword , "content" ) . Type ( searchType )
2020-08-30 19:08:01 +03:00
query := elastic . NewBoolQuery ( )
query = query . Must ( kwQuery )
if len ( repoIDs ) > 0 {
2022-01-20 20:46:10 +03:00
repoStrs := make ( [ ] interface { } , 0 , len ( repoIDs ) )
2020-08-30 19:08:01 +03:00
for _ , repoID := range repoIDs {
repoStrs = append ( repoStrs , repoID )
}
repoQuery := elastic . NewTermsQuery ( "repo_id" , repoStrs ... )
query = query . Must ( repoQuery )
}
var (
start int
kw = "<em>" + keyword + "</em>"
aggregation = elastic . NewTermsAggregation ( ) . Field ( "language" ) . Size ( 10 ) . OrderByCountDesc ( )
)
if page > 0 {
start = ( page - 1 ) * pageSize
}
if len ( language ) == 0 {
searchResult , err := b . client . Search ( ) .
Index ( b . indexerAliasName ) .
Aggregation ( "language" , aggregation ) .
Query ( query ) .
2020-09-12 15:31:52 +03:00
Highlight (
elastic . NewHighlight ( ) .
Field ( "content" ) .
NumOfFragments ( 0 ) . // return all highting content on fragments
HighlighterType ( "fvh" ) ,
) .
2020-08-30 19:08:01 +03:00
Sort ( "repo_id" , true ) .
From ( start ) . Size ( pageSize ) .
2022-01-27 11:30:51 +03:00
Do ( ctx )
2020-08-30 19:08:01 +03:00
if err != nil {
2022-01-27 11:30:51 +03:00
return 0 , nil , nil , b . checkError ( err )
2020-08-30 19:08:01 +03:00
}
return convertResult ( searchResult , kw , pageSize )
}
langQuery := elastic . NewMatchQuery ( "language" , language )
countResult , err := b . client . Search ( ) .
Index ( b . indexerAliasName ) .
Aggregation ( "language" , aggregation ) .
Query ( query ) .
Size ( 0 ) . // We only needs stats information
2022-01-27 11:30:51 +03:00
Do ( ctx )
2020-08-30 19:08:01 +03:00
if err != nil {
2022-01-27 11:30:51 +03:00
return 0 , nil , nil , b . checkError ( err )
2020-08-30 19:08:01 +03:00
}
query = query . Must ( langQuery )
searchResult , err := b . client . Search ( ) .
Index ( b . indexerAliasName ) .
Query ( query ) .
2020-09-12 15:31:52 +03:00
Highlight (
elastic . NewHighlight ( ) .
Field ( "content" ) .
NumOfFragments ( 0 ) . // return all highting content on fragments
HighlighterType ( "fvh" ) ,
) .
2020-08-30 19:08:01 +03:00
Sort ( "repo_id" , true ) .
From ( start ) . Size ( pageSize ) .
2022-01-27 11:30:51 +03:00
Do ( ctx )
2020-08-30 19:08:01 +03:00
if err != nil {
2022-01-27 11:30:51 +03:00
return 0 , nil , nil , b . checkError ( err )
2020-08-30 19:08:01 +03:00
}
total , hits , _ , err := convertResult ( searchResult , kw , pageSize )
return total , hits , extractAggs ( countResult ) , err
}
// Close implements indexer
2022-01-27 11:30:51 +03:00
func ( b * ElasticSearchIndexer ) Close ( ) {
select {
case <- b . stopTimer :
default :
close ( b . stopTimer )
}
}
func ( b * ElasticSearchIndexer ) checkError ( err error ) error {
var opErr * net . OpError
if ! ( elastic . IsConnErr ( err ) || ( errors . As ( err , & opErr ) && ( opErr . Op == "dial" || opErr . Op == "read" ) ) ) {
return err
}
b . setAvailability ( false )
return err
}
func ( b * ElasticSearchIndexer ) checkAvailability ( ) {
if b . Ping ( ) {
return
}
// Request cluster state to check if elastic is available again
_ , err := b . client . ClusterState ( ) . Do ( graceful . GetManager ( ) . ShutdownContext ( ) )
if err != nil {
b . setAvailability ( false )
return
}
b . setAvailability ( true )
}
func ( b * ElasticSearchIndexer ) setAvailability ( available bool ) {
b . lock . Lock ( )
defer b . lock . Unlock ( )
if b . available == available {
return
}
b . available = available
if b . availabilityCallback != nil {
// Call the callback from within the lock to ensure that the ordering remains correct
b . availabilityCallback ( b . available )
}
}