Skip to content

ES Generator #176

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 31 additions & 24 deletions expr/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,24 @@ var (

type (

// A Node is an element in the expression tree, implemented
// Node is an element in the expression tree, implemented
// by different types (binary, urnary, func, identity, etc)
//
// - qlbridge does not currently implement statements (if, for, switch, etc)
// just expressions, and operators
// qlbridge does not currently implement statements (if, for, switch, etc)
// just expressions, and operators
Node interface {
// string representation of Node parseable back to itself
// String representation of Node parseable back to itself
String() string

// Given a dialect writer write out, equivalent of String()
// but allows different escape characters
// WriteDialect Given a dialect writer write string representation,
// equivalent of String() but allows the DialectWriter to control
// escaping characters.
WriteDialect(w DialectWriter)

// Validate Syntax validation of this expression node
// Validate Syntax of this node
Validate() error

// Protobuf helpers that convert to serializeable format and marshall
// Protobuf helpers that convert to serializeable format and marshal
NodePb() *NodePb
FromPB(*NodePb) Node

Expand All @@ -86,15 +87,16 @@ type (
Expr() *Expr
FromExpr(*Expr) error

// for testing purposes
// Equal Run Deep equality check
Equal(Node) bool

// Get the Type: String, Identity, etc
// NodeType Describe the node type: String, Identity, Func, Binary...
NodeType() string
}

// A negateable node requires a special type of String() function due to
// an enclosing urnary NOT being inserted into middle of string syntax
// NegateableNode describes a Node which can be negated and thus double-negated
// expressions could have their negation reversed. It requires a special type of
// String() function due to an enclosing urnary NOT being inserted into middle of string syntax.
//
// <expression> [NOT] IN ("a","b")
// <expression> [NOT] BETWEEN <expression> AND <expression>
Expand All @@ -103,9 +105,10 @@ type (
// <expression> [NOT] INTERSECTS ("a", "b")
//
NegateableNode interface {
// Must implement Node as well obviously.
Node
// If the node is negateable, we may collapse an surrounding
// negation into here
// If the node is negateable, we may collapse surrounding
// negation into this statement
Negated() bool
// Reverse Negation if Possible: for instance:
// "A" NOT IN ("a","b") => "A" IN ("a","b")
Expand All @@ -116,37 +119,38 @@ type (
Node() Node
}

// Eval context, used to contain info for usage/lookup at runtime evaluation
// EvalContext used to contain info for usage/lookup at runtime evaluation
EvalContext interface {
ContextReader
}
// Eval context, used to contain info for usage/lookup at runtime evaluation
// EvalIncludeContext, used to contain info for usage/lookup at runtime evaluation
EvalIncludeContext interface {
ContextReader
Includer
}

// Context Reader is a key-value interface to read the context of message/row
// using a Get("key") interface. Used by vm to evaluate messages
// ContextReader is a key-value interface to read the values of identites as
// defined in expressions using a Get("key") interface.
ContextReader interface {
Get(key string) (value.Value, bool)
Row() map[string]value.Value
Ts() time.Time
}

// For evaluation storage
// vm writes results to this after evaluation
// ContextWriter VM can write results of evaluation to this storage.
ContextWriter interface {
Put(col SchemaInfo, readCtx ContextReader, v value.Value) error
Delete(row map[string]value.Value) error
}

// ContextReadWriter can both read (provide identity lookup) and
// write results from vm.
ContextReadWriter interface {
ContextReader
ContextWriter
}

// for committing row ops (insert, update)
// RowWriter is for commiting row ops (insert, update)
RowWriter interface {
Commit(rowInfo []SchemaInfo, row RowWriter) error
Put(col SchemaInfo, readCtx ContextReader, v value.Value) error
Expand All @@ -155,10 +159,13 @@ type (

type (

// The generic Expr
// Expr is a generic Expression Node that is able to be serialized
// to json for api usage. It is a generic data structure and one of
// [ (Op,Args), Identity, Value] will be set.
Expr struct {
// The token, and node expressions are non
// nil if it is an expression
// Op defines the token of type of Expression. node expressions are non
// nil if it is an expression. If Op exists then Args will exit and
// Value/Identity will be nil.
Op string `json:"op,omitempty"`
Args []*Expr `json:"args,omitempty"`

Expand Down
4 changes: 2 additions & 2 deletions generators/elasticsearch/es2gen/bridgeutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func scalar(node expr.Node) (interface{}, bool) {
func makeRange(lhs *gentypes.FieldType, op lex.TokenType, rhs expr.Node) (interface{}, error) {
rhsval, ok := scalar(rhs)
if !ok {
return nil, fmt.Errorf("qlindex: unsupported type for comparison: %T", rhs)
return nil, fmt.Errorf("unsupported type for comparison: %T", rhs)
}

// Convert scalars from strings to floats if lhs is numeric and rhs is a
Expand Down Expand Up @@ -141,7 +141,7 @@ func makeRange(lhs *gentypes.FieldType, op lex.TokenType, rhs expr.Node) (interf
case lex.TokenLT:
r.Range = map[string]RangeQry{fieldName: {LT: rhsval}}
default:
return nil, fmt.Errorf("qlindex: unsupported range operator %s", op)
return nil, fmt.Errorf("unsupported range operator %s", op)
}
if lhs.Nested() {
return Nested(lhs, r), nil
Expand Down
55 changes: 33 additions & 22 deletions generators/elasticsearch/es2gen/esgenerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,37 @@ var (
MaxDepth = 1000

_ = gou.EMPTY

// Func Generator Registry for functions that you want to be
// able to convert to es statements
fg *gentypes.FuncGenRegistry
)

// copy-pasta from entity to avoid the import
// when we actually parameterize this we will need to do it differently anyway
func init() {
fg = gentypes.NewFuncGenRegistry("elasticsearch2")
}

// DayBucket Given a time convert to a day bucket (integer)
func DayBucket(dt time.Time) int {
return int(dt.UnixNano() / int64(24*time.Hour))
}

// FilterGenerator Given a Filter expression (Node) Convert
// to a generated statement for Elasticsearch
type FilterGenerator struct {
ts time.Time
inc expr.Includer
schema gentypes.SchemaColumns
fg *gentypes.FuncGenRegistry
}

func NewGenerator(ts time.Time, inc expr.Includer, s gentypes.SchemaColumns) *FilterGenerator {
return &FilterGenerator{ts: ts, inc: inc, schema: s}
return &FilterGenerator{
ts: ts,
inc: inc,
schema: s,
fg: fg,
}
}

func (fg *FilterGenerator) fieldType(n expr.Node) (*gentypes.FieldType, error) {
Expand Down Expand Up @@ -95,7 +110,7 @@ func (fg *FilterGenerator) walkExpr(node expr.Node, depth int) (interface{}, err
filter, err = fg.funcExpr(n, depth+1)
default:
gou.Warnf("not handled %v", node)
return nil, fmt.Errorf("qlindex: unsupported node in expression: %T (%s)", node, node)
return nil, fmt.Errorf("unsupported node in expression: %T (%s)", node, node)
}
if err != nil {
// Convert MissingField errors to a logical `false`
Expand All @@ -116,15 +131,12 @@ func (fg *FilterGenerator) walkExpr(node expr.Node, depth int) (interface{}, err
}

func (fg *FilterGenerator) unaryExpr(node *expr.UnaryNode, depth int) (interface{}, error) {
//gou.Debugf("urnary %v", node.Operator.T.String())
switch node.Operator.T {
case lex.TokenExists:
ft, err := fg.fieldType(node.Arg)
if err != nil {
//gou.Debugf("exists err: %q", err)
return nil, err
}
//gou.Debugf("exists %s", ft)
return Exists(ft), nil

case lex.TokenNegate:
Expand All @@ -134,11 +146,11 @@ func (fg *FilterGenerator) unaryExpr(node *expr.UnaryNode, depth int) (interface
}
return NotFilter(inner), nil
default:
return nil, fmt.Errorf("qlindex: unsupported unary operator: %s", node.Operator.T)
return nil, fmt.Errorf("unsupported unary operator: %s", node.Operator.T)
}
}

// filters returns a boolean expression
// booleanExpr create ES query for given boolean expression
func (fg *FilterGenerator) booleanExpr(bn *expr.BooleanNode, depth int) (interface{}, error) {
if depth > MaxDepth {
return nil, fmt.Errorf("hit max depth on segment generation. bad query?")
Expand All @@ -149,7 +161,7 @@ func (fg *FilterGenerator) booleanExpr(bn *expr.BooleanNode, depth int) (interfa
case lex.TokenOr, lex.TokenLogicOr:
and = false
default:
return nil, fmt.Errorf("qlindex: unexpected op %v", bn.Operator)
return nil, fmt.Errorf("unexpected op %v", bn.Operator)
}

items := make([]interface{}, 0, len(bn.Args))
Expand Down Expand Up @@ -201,19 +213,18 @@ func (fg *FilterGenerator) binaryExpr(node *expr.BinaryNode, depth int) (interfa
case lex.TokenEqual, lex.TokenEqualEqual: // the VM supports both = and ==
rhs, ok := scalar(node.Args[1])
if !ok {
return nil, fmt.Errorf("qlindex: unsupported second argument for equality: %T", node.Args[1])
return nil, fmt.Errorf("unsupported second argument for equality: %T", node.Args[1])
}
if lhs.Nested() {
fieldName, _ := lhs.PrefixAndValue(rhs)
return Nested(lhs, Term(fieldName, rhs)), nil
//return nil, fmt.Errorf("qlindex: == not supported for nested types %q", lhs.String())
}
return Term(lhs.Field, rhs), nil

case lex.TokenNE: // ident(0) != literal(1)
rhs, ok := scalar(node.Args[1])
if !ok {
return nil, fmt.Errorf("qlindex: unsupported second argument for equality: %T", node.Args[1])
return nil, fmt.Errorf("unsupported second argument for equality: %T", node.Args[1])
}
if lhs.Nested() {
fieldName, _ := lhs.PrefixAndValue(rhs)
Expand All @@ -231,7 +242,7 @@ func (fg *FilterGenerator) binaryExpr(node *expr.BinaryNode, depth int) (interfa
case *expr.NumberNode:
rhsstr = rhst.Text
default:
return nil, fmt.Errorf("qlindex: unsupported non-string argument for CONTAINS pattern: %T", node.Args[1])
return nil, fmt.Errorf("unsupported non-string argument for CONTAINS pattern: %T", node.Args[1])
}
return makeWildcard(lhs, rhsstr)

Expand All @@ -245,29 +256,29 @@ func (fg *FilterGenerator) binaryExpr(node *expr.BinaryNode, depth int) (interfa
case *expr.NumberNode:
rhsstr = rhst.Text
default:
return nil, fmt.Errorf("qlindex: unsupported non-string argument for LIKE pattern: %T", node.Args[1])
return nil, fmt.Errorf("unsupported non-string argument for LIKE pattern: %T", node.Args[1])
}
return makeWildcard(lhs, rhsstr)

case lex.TokenIN, lex.TokenIntersects:
// Build up list of arguments
array, ok := node.Args[1].(*expr.ArrayNode)
if !ok {
return nil, fmt.Errorf("qlindex: second argument to %s must be an array, found: %T", op, node.Args[1])
return nil, fmt.Errorf("second argument to %s must be an array, found: %T", op, node.Args[1])
}
args := make([]interface{}, 0, len(array.Args))
for _, nodearg := range array.Args {
strarg, ok := scalar(nodearg)
if !ok {
return nil, fmt.Errorf("qlindex: non-scalar argument in %s clause: %T", op, nodearg)
return nil, fmt.Errorf("non-scalar argument in %s clause: %T", op, nodearg)
}
args = append(args, strarg)
}

return In(lhs, args), nil

default:
return nil, fmt.Errorf("qlindex: unsupported binary expression: %s", op)
return nil, fmt.Errorf("unsupported binary expression: %s", op)
}
}

Expand All @@ -282,15 +293,15 @@ func (fg *FilterGenerator) triExpr(node *expr.TriNode, depth int) (interface{},
}
lower, ok := scalar(node.Args[1])
if !ok {
return nil, fmt.Errorf("qlindex: unsupported type for first argument of BETWEEN expression: %T", node.Args[1])
return nil, fmt.Errorf("unsupported type for first argument of BETWEEN expression: %T", node.Args[1])
}
upper, ok := scalar(node.Args[2])
if !ok {
return nil, fmt.Errorf("qlindex: unsupported type for second argument of BETWEEN expression: %T", node.Args[1])
return nil, fmt.Errorf("unsupported type for second argument of BETWEEN expression: %T", node.Args[1])
}
return makeBetween(lhs, lower, upper)
}
return nil, fmt.Errorf("qlindex: unsupported ternary expression: %s", node.Operator.T)
return nil, fmt.Errorf("unsupported ternary expression: %s", node.Operator.T)
}

func (fg *FilterGenerator) funcExpr(node *expr.FuncNode, depth int) (interface{}, error) {
Expand All @@ -299,7 +310,7 @@ func (fg *FilterGenerator) funcExpr(node *expr.FuncNode, depth int) (interface{}
// see entity.EvalTimeWindow for code implementation. Checks if the contextual time is within the time buckets provided
// by the parameters
if len(node.Args) != 3 {
return nil, fmt.Errorf("qlindex: 'timewindow' function requires 3 arguments, got %d", len(node.Args))
return nil, fmt.Errorf("'timewindow' function requires 3 arguments, got %d", len(node.Args))
}
// We are applying the function to the named field, but the caller *can't* just use the fieldname (which would
// evaluate to nothing, as the field isn't
Expand Down
6 changes: 3 additions & 3 deletions generators/elasticsearch/es2gen/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ func fieldType(s gentypes.SchemaColumns, n expr.Node) (*gentypes.FieldType, erro

ident, ok := n.(*expr.IdentityNode)
if !ok {
return nil, fmt.Errorf("qlindex: expected an identity but found %T (%s)", n, n)
return nil, fmt.Errorf("expected an identity but found %T (%s)", n, n)
}

// This shotgun approach sucks, see https://github.com/lytics/lio/issues/7565
// TODO: This shotgun approach sucks, see https://github.com/araddon/qlbridge/issues/159
ft, ok := s.ColumnInfo(ident.Text)
if ok {
return ft, nil
Expand All @@ -30,7 +30,7 @@ func fieldType(s gentypes.SchemaColumns, n expr.Node) (*gentypes.FieldType, erro
}

// This is legacy crap, we stupidly used to allow this:
// ticket to remove https://github.com/lytics/lio/issues/7565
// ticket to remove https://github.com/araddon/qlbridge/issues/159
//
// `key_name.field value` -> "key_name", "field value"
//
Expand Down
2 changes: 1 addition & 1 deletion generators/elasticsearch/esgen/bridgeutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func makeRange(lhs *gentypes.FieldType, op lex.TokenType, rhs expr.Node) (interf
case lex.TokenLT:
r.Range = map[string]RangeQry{fieldName: {LT: rhsval}}
default:
return nil, fmt.Errorf("qlindex: unsupported range operator %s", op)
return nil, fmt.Errorf("unsupported range operator %s", op)
}
if lhs.Nested() {
return Nested(lhs, r), nil
Expand Down
Loading