Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 21 additions & 41 deletions action/log/logadapter/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (

elasticsearch "github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
elastic "github.com/olivere/elastic/v7"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/search"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"

"github.com/seata/seata-ctl/tool"
)

// QueryLogs is a function that queries specific documents
func (e *Elasticsearch) QueryLogs(filter map[string]interface{}, currency *Currency, number int) error {
client, err := createElasticClient(currency)
client, err := createEsDefaultClient(currency)
if err != nil {
return fmt.Errorf("failed to create elasticsearch client: %w", err)
}
Expand All @@ -33,47 +35,21 @@ func (e *Elasticsearch) QueryLogs(filter map[string]interface{}, currency *Curre
return err
}

// Execute the search query
searchResult, err := client.Search().
Index(indexName).
Size(number).
Query(query).
Do(context.Background())
res, err := client.Search().Index(indexName).Size(number).Query(query).Do(context.Background())

if err != nil {
return fmt.Errorf("error fetching documents: %w", err)
}

err = processSearchHits(searchResult, currency)
err = processSearchHits(res, currency)
if err != nil {
return err
}
return nil
}

// createElasticClient configures and creates a new Elasticsearch client
func createElasticClient(currency *Currency) (*elastic.Client, error) {
httpClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}

client, err := elastic.NewClient(
elastic.SetURL(currency.Address),
elastic.SetHttpClient(httpClient),
elastic.SetSniff(false),
elastic.SetBasicAuth(currency.Username, currency.Password),
)
if err != nil {
return nil, err
}
return client, nil
}

// createEsDefaultClient configures and creates a new Elasticsearch client
func createEsDefaultClient(currency *Currency) (*elasticsearch.Client, error) {
func createEsDefaultClient(currency *Currency) (*elasticsearch.TypedClient, error) {
// Configure the Elasticsearch client
cfg := elasticsearch.Config{
Addresses: []string{
Expand All @@ -88,22 +64,22 @@ func createEsDefaultClient(currency *Currency) (*elasticsearch.Client, error) {
}

// Create the client instance
es, err := elasticsearch.NewClient(cfg)
es, err := elasticsearch.NewTypedClient(cfg)
if err != nil {
return nil, fmt.Errorf("error creating the client: %s", err)
}
return es, nil
}

// processSearchHits handles and formats the search results
func processSearchHits(searchResult *elastic.SearchResult, currency *Currency) error {
if len(searchResult.Hits.Hits) == 0 {
func processSearchHits(res *search.Response, currency *Currency) error {
if len(res.Hits.Hits) == 0 {
return fmt.Errorf("no documents found")
}

for _, hit := range searchResult.Hits.Hits {
for _, hit := range res.Hits.Hits {
var doc map[string]interface{}
if err := json.Unmarshal(hit.Source, &doc); err != nil {
if err := json.Unmarshal(hit.Source_, &doc); err != nil {
return fmt.Errorf("failed to unmarshal document: %w", err)
}

Expand Down Expand Up @@ -257,21 +233,25 @@ func removeKeywordSuffix(input []string) []string {
return result
}

// buildQuery constructs a BoolQuery based on the provided filter and index fields
func buildQuery(filter map[string]interface{}, indexFields []string) (*elastic.BoolQuery, error) {
query := elastic.NewBoolQuery()
// buildQuery constructs a types Query based on the provided filter and index fields
func buildQuery(filter map[string]interface{}, indexFields []string) (*types.Query, error) {
query := &types.Query{}
if filter["query"].(string) != "{}" {
indexMap, err := ParseJobString(filter["query"].(string))
if err != nil {
return query, err
}
var termQuery []types.Query
for k, v := range indexMap {
if Contains(indexFields, k) {
query.Should(elastic.NewTermQuery(k, v))
termQuery = append(termQuery, types.Query{Term: map[string]types.TermQuery{k: {Value: v}}})
} else {
return query, fmt.Errorf("invalid index key: %s", k)
}
}
query.Bool = &types.BoolQuery{
Should: termQuery,
}
}
return query, nil
}
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/elastic/go-elasticsearch/v8 v8.15.0
github.com/guptarohit/asciigraph v0.7.3
github.com/jedib0t/go-pretty/v6 v6.4.7
github.com/olivere/elastic/v7 v7.0.32
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.16.0
Expand Down Expand Up @@ -48,7 +47,6 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/spf13/afero v1.9.5 // indirect
Expand Down
5 changes: 0 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po=
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY=
github.com/frankban/quicktest v1.14.4/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
Expand Down Expand Up @@ -205,15 +203,12 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/olivere/elastic/v7 v7.0.32 h1:R7CXvbu8Eq+WlsLgxmKVKPox0oOwAE/2T9Si5BnvK6E=
github.com/olivere/elastic/v7 v7.0.32/go.mod h1:c7PVmLe3Fxq77PIfY/bZmxY/TAamBhCzZ8xDOE09a9k=
github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA=
github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To=
github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw=
github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro=
github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ=
github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.6.0/go.mod h1:qBsxPvzyUincmltOk6iyRVxHYg4adc0OFOv72ZdLa18=
github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg=
Expand Down
Loading