diff --git a/.env.example b/.env.example index 7cdeea5..992707a 100644 --- a/.env.example +++ b/.env.example @@ -2,15 +2,15 @@ PG_USER= PG_PASSWORD= PG_DATABASE= -API_KEYS= - -# CloudSQL DB var -DB_INSTANCE_CONNECTION_NAME= +API_KEYS=aaaa # Local DB Vars PG_HOST= PG_PORT= +# Prod DB vars +DB_INSTANCE_CONNECTION_NAME= + # Optional vars (will be set to default if not set) PORT=8080 MAX_RELAY_BATCH_SIZE=1000 diff --git a/.github/workflows/lint-test.yml b/.github/workflows/lint-test.yml index 73b507a..8194f1b 100644 --- a/.github/workflows/lint-test.yml +++ b/.github/workflows/lint-test.yml @@ -39,5 +39,5 @@ jobs: restore-keys: | ${{ runner.os }}-go- - - name: Run Unit tests - run: go test ./... + - name: Run tests + run: make test diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ee85456..a5b902e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -16,6 +16,7 @@ repos: - id: go-critic - id: go-build - id: go-mod-tidy + - id: go-unit-tests - repo: https://github.com/Yelp/detect-secrets rev: v1.4.0 hooks: diff --git a/Dockerfile.testdb b/Dockerfile.testdb new file mode 100644 index 0000000..dc0d8b0 --- /dev/null +++ b/Dockerfile.testdb @@ -0,0 +1,4 @@ +# This Dockerfile used to build the image used for testing TxDB +FROM postgres:14.3 + +COPY ./postgres-driver/sqlc/schema.sql /docker-entrypoint-initdb.d/ diff --git a/Makefile b/Makefile index f39abfc..08e8dea 100644 --- a/Makefile +++ b/Makefile @@ -1,12 +1,45 @@ +SHELL := /bin/bash + +gen: gen_sql gen_mocks + +gen_sql: + sqlc generate -f ./postgres-driver/sqlc/sqlc.yaml + gen_mocks: mockery --name=Driver --recursive --inpkg --case=underscore mockery --name=RelayWriter --recursive --inpkg --case=underscore mockery --name=ServiceRecordWriter --recursive --inpkg --case=underscore +test: test_unit test_env_up run_driver_tests test_env_down + +test_unit: + @echo "๐Ÿ›  Running Unit tests..." + @go test ./... -short || true + @echo "โœ… Unit tests completed!" + +test_env_up: + @echo "๐Ÿงช Starting up Transaction DB test database ..." + @docker-compose -f ./testdata/docker-compose.test.yml up -d --remove-orphans --build + @echo "โณ Waiting for test DB to be ready ..." + @attempts=0; while ! pg_isready -h localhost -p 5432 -U postgres -d postgres >/dev/null && [[ $$attempts -lt 5 ]]; do sleep 1; attempts=$$(($$attempts + 1)); done + @[[ $$attempts -lt 5 ]] && echo "๐Ÿ˜ Test Transaction DB is up ..." || (echo "โŒ Test Transaction DB failed to start" && make test_env_down >/dev/null && exit 1) + @sleep 2; + @echo "๐Ÿš€ Test environment is up ..." +test_env_down: + @echo "๐Ÿงช Shutting down Portal HTTP DB test environment ..." + @docker-compose -f ./testdata/docker-compose.test.yml down --remove-orphans >/dev/null + @echo "โœ… Test environment is down." + +run_driver_tests: + @echo "๐Ÿš— Running PGDriver tests..." + @go test ./... -run Test_RunPGDriverSuite -count=1 || true + @echo "โœ… PGDriver tests completed!" +run_driver_tests_ci: + go test ./... -run Test_RunPGDriverSuite -count=1 + init-pre-commit: wget https://github.com/pre-commit/pre-commit/releases/download/v2.20.0/pre-commit-2.20.0.pyz python3 pre-commit-2.20.0.pyz install - python3 pre-commit-2.20.0.pyz autoupdate go install golang.org/x/tools/cmd/goimports@latest go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest go install -v github.com/go-critic/go-critic/cmd/gocritic@latest diff --git a/batch/batch_mock.go b/batch/batch_mock.go index e85f1e1..60de1c3 100644 --- a/batch/batch_mock.go +++ b/batch/batch_mock.go @@ -3,7 +3,7 @@ package batch import ( context "context" - types "github.com/pokt-foundation/transaction-db/types" + types "github.com/pokt-foundation/transaction-http-db/types" mock "github.com/stretchr/testify/mock" ) diff --git a/batch/batch_test.go b/batch/batch_test.go index cb76eb8..b83548f 100644 --- a/batch/batch_test.go +++ b/batch/batch_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - "github.com/pokt-foundation/transaction-db/types" + "github.com/pokt-foundation/transaction-http-db/types" mock "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap" diff --git a/go.mod b/go.mod index 5765748..32ad8a4 100644 --- a/go.mod +++ b/go.mod @@ -3,16 +3,17 @@ module github.com/pokt-foundation/transaction-http-db go 1.21 require ( + cloud.google.com/go/cloudsqlconn v1.3.0 github.com/gorilla/mux v1.8.0 - github.com/pokt-foundation/transaction-db v1.23.0 + github.com/jackc/pgx/v5 v5.4.3 github.com/pokt-foundation/utils-go v0.11.1 github.com/stretchr/testify v1.8.2 go.uber.org/zap v1.24.0 golang.org/x/sync v0.3.0 + google.golang.org/grpc v1.59.0 ) require ( - cloud.google.com/go/cloudsqlconn v1.3.0 // indirect cloud.google.com/go/compute v1.23.0 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -24,9 +25,9 @@ require ( github.com/googleapis/gax-go/v2 v2.11.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect - github.com/jackc/pgx/v5 v5.4.3 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/joho/godotenv v1.5.1 // indirect + github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/stretchr/objx v0.5.0 // indirect @@ -42,7 +43,6 @@ require ( google.golang.org/api v0.126.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231009173412-8bfb1ae86b6c // indirect - google.golang.org/grpc v1.59.0 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 8278b8d..3e10dd6 100644 --- a/go.sum +++ b/go.sum @@ -19,6 +19,7 @@ github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XP github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -109,8 +110,6 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/pokt-foundation/transaction-db v1.23.0 h1:b848mUg6AkpYqLL8yDDz6cQX/oqcqdCxYE3Sc3nS81c= -github.com/pokt-foundation/transaction-db v1.23.0/go.mod h1:JEpKrqtjO6b4CiDb70bj7s1gn8cIfwlexE4HosL430U= github.com/pokt-foundation/utils-go v0.11.1 h1:o/kF4KFaClAz2AvybDrEQR1TpEEx6zwyuGcOlD/aLuY= github.com/pokt-foundation/utils-go v0.11.1/go.mod h1:YZDpKHum+UINTYIzFJvLdlne2ksnlBonszqZQ7mkKnU= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= diff --git a/main.go b/main.go index 662e7c6..5ecf47c 100644 --- a/main.go +++ b/main.go @@ -8,8 +8,8 @@ import ( "syscall" "time" - postgresdriver "github.com/pokt-foundation/transaction-db/postgres-driver" "github.com/pokt-foundation/transaction-http-db/batch" + postgresdriver "github.com/pokt-foundation/transaction-http-db/postgres-driver" "github.com/pokt-foundation/transaction-http-db/router" "github.com/pokt-foundation/utils-go/environment" "go.uber.org/zap" diff --git a/postgres-driver/copyfrom.go b/postgres-driver/copyfrom.go new file mode 100644 index 0000000..967303b --- /dev/null +++ b/postgres-driver/copyfrom.go @@ -0,0 +1,120 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.20.0 +// source: copyfrom.go + +package postgresdriver + +import ( + "context" +) + +// iteratorForInsertRelays implements pgx.CopyFromSource. +type iteratorForInsertRelays struct { + rows []InsertRelaysParams + skippedFirstNextCall bool +} + +func (r *iteratorForInsertRelays) Next() bool { + if len(r.rows) == 0 { + return false + } + if !r.skippedFirstNextCall { + r.skippedFirstNextCall = true + return true + } + r.rows = r.rows[1:] + return len(r.rows) > 0 +} + +func (r iteratorForInsertRelays) Values() ([]interface{}, error) { + return []interface{}{ + r.rows[0].PoktChainID, + r.rows[0].EndpointID, + r.rows[0].SessionKey, + r.rows[0].ProtocolAppPublicKey, + r.rows[0].RelaySourceUrl, + r.rows[0].PoktNodeAddress, + r.rows[0].PoktNodeDomain, + r.rows[0].PoktNodePublicKey, + r.rows[0].RelayStartDatetime, + r.rows[0].RelayReturnDatetime, + r.rows[0].IsError, + r.rows[0].ErrorCode, + r.rows[0].ErrorName, + r.rows[0].ErrorMessage, + r.rows[0].ErrorSource, + r.rows[0].ErrorType, + r.rows[0].RelayRoundtripTime, + r.rows[0].RelayChainMethodIds, + r.rows[0].RelayDataSize, + r.rows[0].RelayPortalTripTime, + r.rows[0].RelayNodeTripTime, + r.rows[0].RelayUrlIsPublicEndpoint, + r.rows[0].PortalRegionName, + r.rows[0].IsAltruistRelay, + r.rows[0].IsUserRelay, + r.rows[0].RequestID, + r.rows[0].PoktTxID, + r.rows[0].GigastakeAppID, + r.rows[0].CreatedAt, + r.rows[0].UpdatedAt, + r.rows[0].BlockingPlugin, + }, nil +} + +func (r iteratorForInsertRelays) Err() error { + return nil +} + +func (q *Queries) InsertRelays(ctx context.Context, arg []InsertRelaysParams) (int64, error) { + return q.db.CopyFrom(ctx, []string{"relay"}, []string{"pokt_chain_id", "endpoint_id", "session_key", "protocol_app_public_key", "relay_source_url", "pokt_node_address", "pokt_node_domain", "pokt_node_public_key", "relay_start_datetime", "relay_return_datetime", "is_error", "error_code", "error_name", "error_message", "error_source", "error_type", "relay_roundtrip_time", "relay_chain_method_ids", "relay_data_size", "relay_portal_trip_time", "relay_node_trip_time", "relay_url_is_public_endpoint", "portal_region_name", "is_altruist_relay", "is_user_relay", "request_id", "pokt_tx_id", "gigastake_app_id", "created_at", "updated_at", "blocking_plugin"}, &iteratorForInsertRelays{rows: arg}) +} + +// iteratorForInsertServiceRecords implements pgx.CopyFromSource. +type iteratorForInsertServiceRecords struct { + rows []InsertServiceRecordsParams + skippedFirstNextCall bool +} + +func (r *iteratorForInsertServiceRecords) Next() bool { + if len(r.rows) == 0 { + return false + } + if !r.skippedFirstNextCall { + r.skippedFirstNextCall = true + return true + } + r.rows = r.rows[1:] + return len(r.rows) > 0 +} + +func (r iteratorForInsertServiceRecords) Values() ([]interface{}, error) { + return []interface{}{ + r.rows[0].NodePublicKey, + r.rows[0].PoktChainID, + r.rows[0].SessionKey, + r.rows[0].RequestID, + r.rows[0].PortalRegionName, + r.rows[0].Latency, + r.rows[0].Tickets, + r.rows[0].Result, + r.rows[0].Available, + r.rows[0].Successes, + r.rows[0].Failures, + r.rows[0].P90SuccessLatency, + r.rows[0].MedianSuccessLatency, + r.rows[0].WeightedSuccessLatency, + r.rows[0].SuccessRate, + r.rows[0].CreatedAt, + r.rows[0].UpdatedAt, + }, nil +} + +func (r iteratorForInsertServiceRecords) Err() error { + return nil +} + +func (q *Queries) InsertServiceRecords(ctx context.Context, arg []InsertServiceRecordsParams) (int64, error) { + return q.db.CopyFrom(ctx, []string{"service_record"}, []string{"node_public_key", "pokt_chain_id", "session_key", "request_id", "portal_region_name", "latency", "tickets", "result", "available", "successes", "failures", "p90_success_latency", "median_success_latency", "weighted_success_latency", "success_rate", "created_at", "updated_at"}, &iteratorForInsertServiceRecords{rows: arg}) +} diff --git a/postgres-driver/db.generated.go b/postgres-driver/db.generated.go new file mode 100644 index 0000000..3ec16ea --- /dev/null +++ b/postgres-driver/db.generated.go @@ -0,0 +1,33 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.20.0 + +package postgresdriver + +import ( + "context" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" +) + +type DBTX interface { + Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error) + Query(context.Context, string, ...interface{}) (pgx.Rows, error) + QueryRow(context.Context, string, ...interface{}) pgx.Row + CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx pgx.Tx) *Queries { + return &Queries{ + db: tx, + } +} diff --git a/postgres-driver/models.generated.go b/postgres-driver/models.generated.go new file mode 100644 index 0000000..39ae5de --- /dev/null +++ b/postgres-driver/models.generated.go @@ -0,0 +1,123 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.20.0 + +package postgresdriver + +import ( + "database/sql/driver" + "fmt" + + "github.com/jackc/pgx/v5/pgtype" +) + +type ErrorSourcesEnum string + +const ( + ErrorSourcesEnumInternal ErrorSourcesEnum = "internal" + ErrorSourcesEnumExternal ErrorSourcesEnum = "external" +) + +func (e *ErrorSourcesEnum) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = ErrorSourcesEnum(s) + case string: + *e = ErrorSourcesEnum(s) + default: + return fmt.Errorf("unsupported scan type for ErrorSourcesEnum: %T", src) + } + return nil +} + +type NullErrorSourcesEnum struct { + ErrorSourcesEnum ErrorSourcesEnum `json:"errorSourcesEnum"` + Valid bool `json:"valid"` // Valid is true if ErrorSourcesEnum is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullErrorSourcesEnum) Scan(value interface{}) error { + if value == nil { + ns.ErrorSourcesEnum, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.ErrorSourcesEnum.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullErrorSourcesEnum) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.ErrorSourcesEnum), nil +} + +type PocketSession struct { + ID int64 `json:"id"` + SessionKey string `json:"sessionKey"` + SessionHeight int32 `json:"sessionHeight"` + PortalRegionName string `json:"portalRegionName"` + CreatedAt pgtype.Timestamp `json:"createdAt"` + UpdatedAt pgtype.Timestamp `json:"updatedAt"` +} + +type PortalRegion struct { + PortalRegionName string `json:"portalRegionName"` +} + +type Relay struct { + ID int64 `json:"id"` + PoktChainID string `json:"poktChainId"` + EndpointID string `json:"endpointId"` + SessionKey string `json:"sessionKey"` + ProtocolAppPublicKey string `json:"protocolAppPublicKey"` + RelaySourceUrl pgtype.Text `json:"relaySourceUrl"` + PoktNodeAddress pgtype.Text `json:"poktNodeAddress"` + PoktNodeDomain pgtype.Text `json:"poktNodeDomain"` + PoktNodePublicKey pgtype.Text `json:"poktNodePublicKey"` + RelayStartDatetime pgtype.Timestamp `json:"relayStartDatetime"` + RelayReturnDatetime pgtype.Timestamp `json:"relayReturnDatetime"` + IsError bool `json:"isError"` + ErrorCode pgtype.Int4 `json:"errorCode"` + ErrorName pgtype.Text `json:"errorName"` + ErrorMessage pgtype.Text `json:"errorMessage"` + ErrorSource NullErrorSourcesEnum `json:"errorSource"` + ErrorType pgtype.Text `json:"errorType"` + RelayRoundtripTime float64 `json:"relayRoundtripTime"` + RelayChainMethodIds string `json:"relayChainMethodIds"` + RelayDataSize int32 `json:"relayDataSize"` + RelayPortalTripTime float64 `json:"relayPortalTripTime"` + RelayNodeTripTime float64 `json:"relayNodeTripTime"` + RelayUrlIsPublicEndpoint bool `json:"relayUrlIsPublicEndpoint"` + PortalRegionName string `json:"portalRegionName"` + IsAltruistRelay bool `json:"isAltruistRelay"` + IsUserRelay bool `json:"isUserRelay"` + RequestID string `json:"requestId"` + PoktTxID pgtype.Text `json:"poktTxId"` + GigastakeAppID pgtype.Text `json:"gigastakeAppId"` + CreatedAt pgtype.Timestamp `json:"createdAt"` + UpdatedAt pgtype.Timestamp `json:"updatedAt"` + BlockingPlugin pgtype.Text `json:"blockingPlugin"` +} + +type ServiceRecord struct { + ID int64 `json:"id"` + NodePublicKey string `json:"nodePublicKey"` + PoktChainID string `json:"poktChainId"` + SessionKey string `json:"sessionKey"` + RequestID string `json:"requestId"` + PortalRegionName string `json:"portalRegionName"` + Latency float64 `json:"latency"` + Tickets int32 `json:"tickets"` + Result string `json:"result"` + Available bool `json:"available"` + Successes int32 `json:"successes"` + Failures int32 `json:"failures"` + P90SuccessLatency float64 `json:"p90SuccessLatency"` + MedianSuccessLatency float64 `json:"medianSuccessLatency"` + WeightedSuccessLatency float64 `json:"weightedSuccessLatency"` + SuccessRate float64 `json:"successRate"` + CreatedAt pgtype.Timestamp `json:"createdAt"` + UpdatedAt pgtype.Timestamp `json:"updatedAt"` +} diff --git a/postgres-driver/pocket_session.go b/postgres-driver/pocket_session.go new file mode 100644 index 0000000..16eaff2 --- /dev/null +++ b/postgres-driver/pocket_session.go @@ -0,0 +1,34 @@ +package postgresdriver + +import ( + "context" + "strings" + "time" + + "github.com/pokt-foundation/transaction-http-db/types" +) + +const ( + errMessageDuplicateSessionKey = `duplicate key value violates unique constraint "pocket_session_session_key_key"` +) + +func (d *PostgresDriver) WriteSession(ctx context.Context, session types.PocketSession) error { + now := time.Now() + + err := d.InsertPocketSession(ctx, InsertPocketSessionParams{ + SessionKey: session.SessionKey, + SessionHeight: int32(session.SessionHeight), + PortalRegionName: session.PortalRegionName, + CreatedAt: newTimestamp(now), + UpdatedAt: newTimestamp(now), + }) + if err != nil { + if strings.Contains(err.Error(), errMessageDuplicateSessionKey) { + return types.ErrRepeatedSessionKey + } + + return err + } + + return nil +} diff --git a/postgres-driver/pocket_session_test.go b/postgres-driver/pocket_session_test.go new file mode 100644 index 0000000..203b723 --- /dev/null +++ b/postgres-driver/pocket_session_test.go @@ -0,0 +1,29 @@ +package postgresdriver + +import ( + "context" + + "github.com/pokt-foundation/transaction-http-db/types" +) + +func (ts *PGDriverTestSuite) TestPostgresDriver_WriteSession() { + tests := []struct { + name string + session types.PocketSession + err error + }{ + { + name: "Success", + session: types.PocketSession{ + SessionKey: "21", + SessionHeight: 21, + PortalRegionName: "europe-southwest1", + }, + err: nil, + }, + } + for _, tt := range tests { + ts.Equal(ts.driver.WriteSession(context.Background(), tt.session), tt.err) + ts.Equal(ts.driver.WriteSession(context.Background(), tt.session), types.ErrRepeatedSessionKey) + } +} diff --git a/postgres-driver/portal_region.go b/postgres-driver/portal_region.go new file mode 100644 index 0000000..1fd7a67 --- /dev/null +++ b/postgres-driver/portal_region.go @@ -0,0 +1,11 @@ +package postgresdriver + +import ( + "context" + + "github.com/pokt-foundation/transaction-http-db/types" +) + +func (d *PostgresDriver) WriteRegion(ctx context.Context, region types.PortalRegion) error { + return d.InsertPortalRegion(ctx, region.PortalRegionName) +} diff --git a/postgres-driver/portal_region_test.go b/postgres-driver/portal_region_test.go new file mode 100644 index 0000000..f5058d5 --- /dev/null +++ b/postgres-driver/portal_region_test.go @@ -0,0 +1,26 @@ +package postgresdriver + +import ( + "context" + + "github.com/pokt-foundation/transaction-http-db/types" +) + +func (ts *PGDriverTestSuite) TestPostgresDriver_WriteRegion() { + tests := []struct { + name string + region types.PortalRegion + err error + }{ + { + name: "Success", + region: types.PortalRegion{ + PortalRegionName: "Cartago", + }, + err: nil, + }, + } + for _, tt := range tests { + ts.Equal(ts.driver.WriteRegion(context.Background(), tt.region), tt.err) + } +} diff --git a/postgres-driver/postgresdriver.go b/postgres-driver/postgresdriver.go new file mode 100644 index 0000000..9b07a86 --- /dev/null +++ b/postgres-driver/postgresdriver.go @@ -0,0 +1,230 @@ +package postgresdriver + +import ( + "context" + "errors" + "fmt" + "net" + "time" + + "cloud.google.com/go/cloudsqlconn" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" + "github.com/jackc/pgx/v5/pgxpool" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// The PostgresDriver struct satisfies the Driver interface which defines all database driver methods +type ( + PostgresDriver struct { + *Queries + db *pgxpool.Pool + } + + CloudSQLConfig struct { + DBUser string + DBPassword string + DBName string + InstanceConnectionName string + PublicIP string + PrivateIP string + } +) + +var ( + errCantSetPrivateandPublicIP error = errors.New("cannot set both private and public IP; must set one or the other") +) + +/* ---------- Postgres Connection Funcs ---------- */ + +/* <--------- PGX Pool Connection ---------> */ + +/* +NewCloudSQLPostgresDriver +- Creates a pool of connections to a Cloud SQL instance using the provided CloudSQLConfig. +- Uses the Cloud SQL Connector for Go and pgx for the connections. +- Establishes a dialer with the desired options (like using private IP). +- For each acquired connection from the pool, custom enum types are registered.. +- It is important to note that this function will return an error if both PublicIP and PrivateIP are provided in the CloudSQLConfig. +*/ +func NewCloudSQLPostgresDriver(options CloudSQLConfig) (*PostgresDriver, func() error, error) { + if options.PublicIP != "" && options.PrivateIP != "" { + return nil, nil, errCantSetPrivateandPublicIP + } + + dsn := fmt.Sprintf("user=%s password=%s dbname=%s", options.DBUser, options.DBPassword, options.DBName) + + config, err := pgxpool.ParseConfig(dsn) + if err != nil { + return nil, nil, err + } + + var opts []cloudsqlconn.Option + if options.PublicIP != "" { + opts = append(opts, cloudsqlconn.WithDefaultDialOptions(cloudsqlconn.WithPublicIP())) + } + if options.PrivateIP != "" { + opts = append(opts, cloudsqlconn.WithDefaultDialOptions(cloudsqlconn.WithPrivateIP())) + } + + d, err := cloudsqlconn.NewDialer(context.Background(), opts...) + if err != nil { + return nil, nil, err + } + + config.ConnConfig.DialFunc = func(ctx context.Context, _, _ string) (net.Conn, error) { + return d.Dial(ctx, options.InstanceConnectionName) + } + pool, err := createAndConfigurePool(config) + if err != nil { + return nil, nil, err + } + + cleanup := func() error { + pool.Close() + return d.Close() + } + + driver := &PostgresDriver{ + Queries: New(pool), + db: pool, + } + + return driver, cleanup, nil +} + +/* +NewPostgresDriver +- Creates a pool of connections to a PostgreSQL database using the provided connection string. +- Parses the connection string into a pgx pool configuration object. +- For each acquired connection from the pool, custom enum types are registered. +- Returns the established connection pool. +- This function is ideal for creating multiple reusable connections to a PostgreSQL database, particularly useful for handling multiple concurrent database operations. +*/ +func NewPostgresDriver(connectionString string) (*PostgresDriver, func() error, error) { + config, err := pgxpool.ParseConfig(connectionString) + if err != nil { + return nil, nil, err + } + + pool, err := createAndConfigurePool(config) + if err != nil { + return nil, nil, err + } + + cleanup := func() error { + pool.Close() + return nil + } + + driver := &PostgresDriver{ + Queries: New(pool), + db: pool, + } + + return driver, cleanup, nil +} + +// Configures the connection pool with custom enum types. +func createAndConfigurePool(config *pgxpool.Config) (*pgxpool.Pool, error) { + pool, err := pgxpool.NewWithConfig(context.Background(), config) + if err != nil { + return nil, fmt.Errorf("pgxpool.NewWithConfig: %v", err) + } + + // Collect the custom data types once, store them in memory, and register them for every future connection. + customTypes, err := getCustomDataTypes(context.Background(), pool) + if err != nil { + return nil, err + } + config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error { + for _, t := range customTypes { + conn.TypeMap().RegisterType(t) + } + return nil + } + + // Immediately close the old pool and open a new one with the new config. + pool.Close() + pool, err = pgxpool.NewWithConfig(context.Background(), config) + if err != nil { + return nil, err + } + + return pool, nil +} + +// Any custom DB types made with CREATE TYPE need to be registered with pgx. +// https://github.com/kyleconroy/sqlc/issues/2116 +// https://stackoverflow.com/questions/75658429/need-to-update-psql-row-of-a-composite-type-in-golang-with-jack-pgx +// https://pkg.go.dev/github.com/jackc/pgx/v5/pgtype +func getCustomDataTypes(ctx context.Context, pool *pgxpool.Pool) ([]*pgtype.Type, error) { + // Get a single connection just to load type information. + conn, err := pool.Acquire(ctx) + if err != nil { + return nil, err + } + defer conn.Release() + + dataTypeNames := []string{ + "error_sources_enum", + "_error_sources_enum", + } + + var typesToRegister []*pgtype.Type + for _, typeName := range dataTypeNames { + dataType, err := conn.Conn().LoadType(ctx, typeName) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to load type %s: %v", typeName, err) + } + // You need to register only for this connection too, otherwise the array type will look for the register element type. + conn.Conn().TypeMap().RegisterType(dataType) + typesToRegister = append(typesToRegister, dataType) + } + return typesToRegister, nil +} + +func newText(value string) pgtype.Text { + if value == "" { + return pgtype.Text{} + } + + return pgtype.Text{ + String: value, + Valid: true, + } +} + +func newInt4(value int32, allowZero bool) pgtype.Int4 { + if !allowZero && value == 0 { + return pgtype.Int4{} + } + + return pgtype.Int4{ + Int32: value, + Valid: true, + } +} + +func newTimestamp(value time.Time) pgtype.Timestamp { + if value.IsZero() { + return pgtype.Timestamp{} + } + + return pgtype.Timestamp{ + Time: value, + Valid: true, + } +} + +func newNullErrorSourcesEnum(e ErrorSourcesEnum) NullErrorSourcesEnum { + if e == "" { + return NullErrorSourcesEnum{} + } + + return NullErrorSourcesEnum{ + ErrorSourcesEnum: e, + Valid: true, + } +} diff --git a/postgres-driver/query.sql.generated.go b/postgres-driver/query.sql.generated.go new file mode 100644 index 0000000..a6281e0 --- /dev/null +++ b/postgres-driver/query.sql.generated.go @@ -0,0 +1,281 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.20.0 +// source: query.sql + +package postgresdriver + +import ( + "context" + + "github.com/jackc/pgx/v5/pgtype" +) + +const insertPocketSession = `-- name: InsertPocketSession :exec +INSERT INTO pocket_session ( + session_key, + session_height, + portal_region_name, + created_at, + updated_at + ) +VALUES ($1, $2, $3, $4, $5) +` + +type InsertPocketSessionParams struct { + SessionKey string `json:"sessionKey"` + SessionHeight int32 `json:"sessionHeight"` + PortalRegionName string `json:"portalRegionName"` + CreatedAt pgtype.Timestamp `json:"createdAt"` + UpdatedAt pgtype.Timestamp `json:"updatedAt"` +} + +func (q *Queries) InsertPocketSession(ctx context.Context, arg InsertPocketSessionParams) error { + _, err := q.db.Exec(ctx, insertPocketSession, + arg.SessionKey, + arg.SessionHeight, + arg.PortalRegionName, + arg.CreatedAt, + arg.UpdatedAt, + ) + return err +} + +const insertPortalRegion = `-- name: InsertPortalRegion :exec +INSERT INTO portal_region (portal_region_name) +VALUES ($1) +` + +func (q *Queries) InsertPortalRegion(ctx context.Context, portalRegionName string) error { + _, err := q.db.Exec(ctx, insertPortalRegion, portalRegionName) + return err +} + +type InsertRelaysParams struct { + PoktChainID string `json:"poktChainId"` + EndpointID string `json:"endpointId"` + SessionKey string `json:"sessionKey"` + ProtocolAppPublicKey string `json:"protocolAppPublicKey"` + RelaySourceUrl pgtype.Text `json:"relaySourceUrl"` + PoktNodeAddress pgtype.Text `json:"poktNodeAddress"` + PoktNodeDomain pgtype.Text `json:"poktNodeDomain"` + PoktNodePublicKey pgtype.Text `json:"poktNodePublicKey"` + RelayStartDatetime pgtype.Timestamp `json:"relayStartDatetime"` + RelayReturnDatetime pgtype.Timestamp `json:"relayReturnDatetime"` + IsError bool `json:"isError"` + ErrorCode pgtype.Int4 `json:"errorCode"` + ErrorName pgtype.Text `json:"errorName"` + ErrorMessage pgtype.Text `json:"errorMessage"` + ErrorSource NullErrorSourcesEnum `json:"errorSource"` + ErrorType pgtype.Text `json:"errorType"` + RelayRoundtripTime float64 `json:"relayRoundtripTime"` + RelayChainMethodIds string `json:"relayChainMethodIds"` + RelayDataSize int32 `json:"relayDataSize"` + RelayPortalTripTime float64 `json:"relayPortalTripTime"` + RelayNodeTripTime float64 `json:"relayNodeTripTime"` + RelayUrlIsPublicEndpoint bool `json:"relayUrlIsPublicEndpoint"` + PortalRegionName string `json:"portalRegionName"` + IsAltruistRelay bool `json:"isAltruistRelay"` + IsUserRelay bool `json:"isUserRelay"` + RequestID string `json:"requestId"` + PoktTxID pgtype.Text `json:"poktTxId"` + GigastakeAppID pgtype.Text `json:"gigastakeAppId"` + CreatedAt pgtype.Timestamp `json:"createdAt"` + UpdatedAt pgtype.Timestamp `json:"updatedAt"` + BlockingPlugin pgtype.Text `json:"blockingPlugin"` +} + +type InsertServiceRecordsParams struct { + NodePublicKey string `json:"nodePublicKey"` + PoktChainID string `json:"poktChainId"` + SessionKey string `json:"sessionKey"` + RequestID string `json:"requestId"` + PortalRegionName string `json:"portalRegionName"` + Latency float64 `json:"latency"` + Tickets int32 `json:"tickets"` + Result string `json:"result"` + Available bool `json:"available"` + Successes int32 `json:"successes"` + Failures int32 `json:"failures"` + P90SuccessLatency float64 `json:"p90SuccessLatency"` + MedianSuccessLatency float64 `json:"medianSuccessLatency"` + WeightedSuccessLatency float64 `json:"weightedSuccessLatency"` + SuccessRate float64 `json:"successRate"` + CreatedAt pgtype.Timestamp `json:"createdAt"` + UpdatedAt pgtype.Timestamp `json:"updatedAt"` +} + +const selectRelay = `-- name: SelectRelay :one +SELECT r.id, + r.pokt_chain_id, + r.endpoint_id, + r.session_key, + r.protocol_app_public_key, + r.relay_source_url, + r.pokt_node_address, + r.pokt_node_domain, + r.pokt_node_public_key, + r.relay_start_datetime, + r.relay_return_datetime, + r.is_error, + r.error_code, + r.error_name, + r.error_message, + r.error_source, + r.error_type, + r.relay_roundtrip_time, + r.relay_chain_method_ids, + r.relay_data_size, + r.relay_portal_trip_time, + r.relay_node_trip_time, + r.relay_url_is_public_endpoint, + r.is_altruist_relay, + r.is_user_relay, + r.request_id, + r.pokt_tx_id, + r.gigastake_app_id, + r.created_at, + r.updated_at, + r.blocking_plugin, + ps.session_key, + ps.session_height, + ps.created_at, + ps.updated_at, + pr.portal_region_name +FROM relay r + INNER JOIN pocket_session ps ON ps.session_key = r.session_key + INNER JOIN portal_region pr ON pr.portal_region_name = r.portal_region_name +WHERE r.id = $1 +` + +type SelectRelayRow struct { + ID int64 `json:"id"` + PoktChainID string `json:"poktChainId"` + EndpointID string `json:"endpointId"` + SessionKey string `json:"sessionKey"` + ProtocolAppPublicKey string `json:"protocolAppPublicKey"` + RelaySourceUrl pgtype.Text `json:"relaySourceUrl"` + PoktNodeAddress pgtype.Text `json:"poktNodeAddress"` + PoktNodeDomain pgtype.Text `json:"poktNodeDomain"` + PoktNodePublicKey pgtype.Text `json:"poktNodePublicKey"` + RelayStartDatetime pgtype.Timestamp `json:"relayStartDatetime"` + RelayReturnDatetime pgtype.Timestamp `json:"relayReturnDatetime"` + IsError bool `json:"isError"` + ErrorCode pgtype.Int4 `json:"errorCode"` + ErrorName pgtype.Text `json:"errorName"` + ErrorMessage pgtype.Text `json:"errorMessage"` + ErrorSource NullErrorSourcesEnum `json:"errorSource"` + ErrorType pgtype.Text `json:"errorType"` + RelayRoundtripTime float64 `json:"relayRoundtripTime"` + RelayChainMethodIds string `json:"relayChainMethodIds"` + RelayDataSize int32 `json:"relayDataSize"` + RelayPortalTripTime float64 `json:"relayPortalTripTime"` + RelayNodeTripTime float64 `json:"relayNodeTripTime"` + RelayUrlIsPublicEndpoint bool `json:"relayUrlIsPublicEndpoint"` + IsAltruistRelay bool `json:"isAltruistRelay"` + IsUserRelay bool `json:"isUserRelay"` + RequestID string `json:"requestId"` + PoktTxID pgtype.Text `json:"poktTxId"` + GigastakeAppID pgtype.Text `json:"gigastakeAppId"` + CreatedAt pgtype.Timestamp `json:"createdAt"` + UpdatedAt pgtype.Timestamp `json:"updatedAt"` + BlockingPlugin pgtype.Text `json:"blockingPlugin"` + SessionKey_2 string `json:"sessionKey2"` + SessionHeight int32 `json:"sessionHeight"` + CreatedAt_2 pgtype.Timestamp `json:"createdAt2"` + UpdatedAt_2 pgtype.Timestamp `json:"updatedAt2"` + PortalRegionName string `json:"portalRegionName"` +} + +func (q *Queries) SelectRelay(ctx context.Context, id int64) (SelectRelayRow, error) { + row := q.db.QueryRow(ctx, selectRelay, id) + var i SelectRelayRow + err := row.Scan( + &i.ID, + &i.PoktChainID, + &i.EndpointID, + &i.SessionKey, + &i.ProtocolAppPublicKey, + &i.RelaySourceUrl, + &i.PoktNodeAddress, + &i.PoktNodeDomain, + &i.PoktNodePublicKey, + &i.RelayStartDatetime, + &i.RelayReturnDatetime, + &i.IsError, + &i.ErrorCode, + &i.ErrorName, + &i.ErrorMessage, + &i.ErrorSource, + &i.ErrorType, + &i.RelayRoundtripTime, + &i.RelayChainMethodIds, + &i.RelayDataSize, + &i.RelayPortalTripTime, + &i.RelayNodeTripTime, + &i.RelayUrlIsPublicEndpoint, + &i.IsAltruistRelay, + &i.IsUserRelay, + &i.RequestID, + &i.PoktTxID, + &i.GigastakeAppID, + &i.CreatedAt, + &i.UpdatedAt, + &i.BlockingPlugin, + &i.SessionKey_2, + &i.SessionHeight, + &i.CreatedAt_2, + &i.UpdatedAt_2, + &i.PortalRegionName, + ) + return i, err +} + +const selectServiceRecord = `-- name: SelectServiceRecord :one +SELECT id, + node_public_key, + pokt_chain_id, + session_key, + request_id, + portal_region_name, + latency, + tickets, + result, + available, + successes, + failures, + p90_success_latency, + median_success_latency, + weighted_success_latency, + success_rate, + created_at, + updated_at +FROM service_record +WHERE id = $1 +` + +func (q *Queries) SelectServiceRecord(ctx context.Context, id int64) (ServiceRecord, error) { + row := q.db.QueryRow(ctx, selectServiceRecord, id) + var i ServiceRecord + err := row.Scan( + &i.ID, + &i.NodePublicKey, + &i.PoktChainID, + &i.SessionKey, + &i.RequestID, + &i.PortalRegionName, + &i.Latency, + &i.Tickets, + &i.Result, + &i.Available, + &i.Successes, + &i.Failures, + &i.P90SuccessLatency, + &i.MedianSuccessLatency, + &i.WeightedSuccessLatency, + &i.SuccessRate, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} diff --git a/postgres-driver/relay.go b/postgres-driver/relay.go new file mode 100644 index 0000000..1de5e94 --- /dev/null +++ b/postgres-driver/relay.go @@ -0,0 +1,149 @@ +package postgresdriver + +import ( + "context" + "strings" + "time" + + "github.com/pokt-foundation/transaction-http-db/types" +) + +const chainMethodIDSeparator = "," + +func (d *PostgresDriver) WriteRelay(ctx context.Context, relay types.Relay) error { + createdAt := time.Now() + + _, err := d.InsertRelays(ctx, []InsertRelaysParams{ + { + PoktChainID: relay.PoktChainID, + EndpointID: relay.EndpointID, + SessionKey: relay.SessionKey, + ProtocolAppPublicKey: relay.ProtocolAppPublicKey, + RelaySourceUrl: newText(relay.RelaySourceURL), + PoktNodeAddress: newText(relay.PoktNodeAddress), + PoktNodeDomain: newText(relay.PoktNodeDomain), + PoktNodePublicKey: newText(relay.PoktNodePublicKey), + RelayStartDatetime: newTimestamp(relay.RelayStartDatetime), + RelayReturnDatetime: newTimestamp(relay.RelayReturnDatetime), + IsError: relay.IsError, + ErrorCode: newInt4(int32(relay.ErrorCode), false), + ErrorName: newText(relay.ErrorName), + ErrorMessage: newText(relay.ErrorMessage), + ErrorType: newText(relay.ErrorType), + ErrorSource: newNullErrorSourcesEnum(ErrorSourcesEnum(relay.ErrorSource)), + RelayRoundtripTime: relay.RelayRoundtripTime, + RelayChainMethodIds: strings.Join(relay.RelayChainMethodIDs, chainMethodIDSeparator), + RelayDataSize: int32(relay.RelayDataSize), + RelayPortalTripTime: relay.RelayPortalTripTime, + RelayNodeTripTime: relay.RelayNodeTripTime, + RelayUrlIsPublicEndpoint: relay.RelayURLIsPublicEndpoint, + PortalRegionName: relay.PortalRegionName, + IsAltruistRelay: relay.IsAltruistRelay, + RequestID: relay.RequestID, + PoktTxID: newText(relay.PoktTxID), + IsUserRelay: relay.IsUserRelay, + GigastakeAppID: newText(relay.GigastakeAppID), + CreatedAt: newTimestamp(createdAt), + UpdatedAt: newTimestamp(createdAt), + BlockingPlugin: newText(relay.BlockingPlugin), + }, + }) + + return err +} + +func (d *PostgresDriver) WriteRelays(ctx context.Context, relays []*types.Relay) error { + createdAt := time.Now() + + relayParams := make([]InsertRelaysParams, 0, len(relays)) + + for _, relay := range relays { + relayParams = append(relayParams, InsertRelaysParams{ + PoktChainID: relay.PoktChainID, + EndpointID: relay.EndpointID, + SessionKey: relay.SessionKey, + ProtocolAppPublicKey: relay.ProtocolAppPublicKey, + RelaySourceUrl: newText(relay.RelaySourceURL), + PoktNodeAddress: newText(relay.PoktNodeAddress), + PoktNodeDomain: newText(relay.PoktNodeDomain), + PoktNodePublicKey: newText(relay.PoktNodePublicKey), + RelayStartDatetime: newTimestamp(relay.RelayStartDatetime), + RelayReturnDatetime: newTimestamp(relay.RelayReturnDatetime), + IsError: relay.IsError, + ErrorCode: newInt4(int32(relay.ErrorCode), false), + ErrorName: newText(relay.ErrorName), + ErrorMessage: newText(relay.ErrorMessage), + ErrorType: newText(relay.ErrorType), + ErrorSource: newNullErrorSourcesEnum(ErrorSourcesEnum(relay.ErrorSource)), + RelayRoundtripTime: relay.RelayRoundtripTime, + RelayChainMethodIds: strings.Join(relay.RelayChainMethodIDs, chainMethodIDSeparator), + RelayDataSize: int32(relay.RelayDataSize), + RelayPortalTripTime: relay.RelayPortalTripTime, + RelayNodeTripTime: relay.RelayNodeTripTime, + RelayUrlIsPublicEndpoint: relay.RelayURLIsPublicEndpoint, + PortalRegionName: relay.PortalRegionName, + IsAltruistRelay: relay.IsAltruistRelay, + RequestID: relay.RequestID, + PoktTxID: newText(relay.PoktTxID), + IsUserRelay: relay.IsUserRelay, + GigastakeAppID: newText(relay.GigastakeAppID), + CreatedAt: newTimestamp(createdAt), + UpdatedAt: newTimestamp(createdAt), + BlockingPlugin: newText(relay.BlockingPlugin), + }) + } + + _, err := d.InsertRelays(ctx, relayParams) + + return err +} + +func (d *PostgresDriver) ReadRelay(ctx context.Context, relayID int) (types.Relay, error) { + relay, err := d.SelectRelay(ctx, int64(relayID)) + if err != nil { + return types.Relay{}, err + } + + return types.Relay{ + RelayID: int(relay.ID), + PoktChainID: relay.PoktChainID, + EndpointID: relay.EndpointID, + SessionKey: relay.SessionKey, + ProtocolAppPublicKey: relay.ProtocolAppPublicKey, + RelaySourceURL: relay.RelaySourceUrl.String, + PoktNodeAddress: relay.PoktNodeAddress.String, + PoktNodeDomain: relay.PoktNodeDomain.String, + PoktNodePublicKey: relay.PoktNodePublicKey.String, + RelayStartDatetime: relay.RelayStartDatetime.Time, + RelayReturnDatetime: relay.RelayReturnDatetime.Time, + IsError: relay.IsError, + ErrorCode: int(relay.ErrorCode.Int32), + ErrorName: relay.ErrorName.String, + ErrorMessage: relay.ErrorMessage.String, + ErrorType: relay.ErrorType.String, + ErrorSource: types.ErrorSource(relay.ErrorSource.ErrorSourcesEnum), + RelayRoundtripTime: relay.RelayRoundtripTime, + RelayChainMethodIDs: strings.Split(relay.RelayChainMethodIds, ","), + RelayDataSize: int(relay.RelayDataSize), + RelayPortalTripTime: relay.RelayPortalTripTime, + RelayNodeTripTime: relay.RelayNodeTripTime, + RelayURLIsPublicEndpoint: relay.RelayUrlIsPublicEndpoint, + PortalRegionName: relay.PortalRegionName, + IsAltruistRelay: relay.IsAltruistRelay, + RequestID: relay.RequestID, + IsUserRelay: relay.IsUserRelay, + PoktTxID: relay.PoktTxID.String, + GigastakeAppID: relay.GigastakeAppID.String, + CreatedAt: relay.CreatedAt.Time, + UpdatedAt: relay.UpdatedAt.Time, + Session: types.PocketSession{ + SessionKey: relay.SessionKey, + SessionHeight: int(relay.SessionHeight), + CreatedAt: relay.CreatedAt_2.Time, + UpdatedAt: relay.UpdatedAt_2.Time, + }, + Region: types.PortalRegion{ + PortalRegionName: relay.PortalRegionName, + }, + }, nil +} diff --git a/postgres-driver/relay_test.go b/postgres-driver/relay_test.go new file mode 100644 index 0000000..044b921 --- /dev/null +++ b/postgres-driver/relay_test.go @@ -0,0 +1,137 @@ +package postgresdriver + +import ( + "context" + "time" + + "github.com/pokt-foundation/transaction-http-db/types" +) + +func (ts *PGDriverTestSuite) TestPostgresDriver_WriteRelay() { + tests := []struct { + name string + relay types.Relay + err error + }{ + { + name: "Success", + relay: types.Relay{ + PoktChainID: "21", + EndpointID: "21", + SessionKey: ts.firstRelay.SessionKey, + ProtocolAppPublicKey: "21", + RelaySourceURL: "pablo.com", + PoktNodeAddress: "21", + PoktNodeDomain: "pablos.com", + PoktNodePublicKey: "aaa", + RelayStartDatetime: time.Now(), + RelayReturnDatetime: time.Now(), + RelayRoundtripTime: 1, + RelayChainMethodIDs: []string{"get_height"}, + RelayDataSize: 21, + RelayPortalTripTime: 21, + RelayNodeTripTime: 21, + RelayURLIsPublicEndpoint: false, + PortalRegionName: ts.firstRelay.PortalRegionName, + IsAltruistRelay: false, + IsUserRelay: false, + RequestID: "21", + }, + err: nil, + }, + { + name: "Success error relay", + relay: types.Relay{ + IsError: true, + ErrorCode: 21, + ErrorName: "favorite number", + ErrorMessage: "just Pablo can use it", + ErrorType: "chain_check", + ErrorSource: "internal", + PortalRegionName: ts.firstRelay.PortalRegionName, + RelayStartDatetime: time.Now(), + RelayReturnDatetime: time.Now(), + }, + err: nil, + }, + } + for _, tt := range tests { + ts.Run(tt.name, func() { + ts.Equal(ts.driver.WriteRelay(context.Background(), tt.relay), tt.err) + }) + } +} + +func (ts *PGDriverTestSuite) TestPostgresDriver_WriteRelays() { + var relays []*types.Relay + for i := 0; i < 1000; i++ { + relays = append(relays, &types.Relay{ + PoktChainID: "21", + EndpointID: "21", + SessionKey: ts.firstRelay.SessionKey, + ProtocolAppPublicKey: "21", + RelaySourceURL: "pablo.com", + PoktNodeAddress: "21", + PoktNodeDomain: "pablos.com", + PoktNodePublicKey: "aaa", + RelayStartDatetime: time.Now(), + RelayReturnDatetime: time.Now(), + IsError: true, + ErrorCode: 21, + ErrorName: "favorite number", + ErrorMessage: "just Pablo can use it", + ErrorType: "chain_check", + ErrorSource: "internal", + RelayRoundtripTime: 1, + RelayChainMethodIDs: []string{"get_height", "get_balance"}, + RelayDataSize: 21, + RelayPortalTripTime: 21, + RelayNodeTripTime: 21, + RelayURLIsPublicEndpoint: false, + PortalRegionName: ts.firstRelay.PortalRegionName, + IsAltruistRelay: false, + IsUserRelay: false, + RequestID: "21", + }) + } + + tests := []struct { + name string + relays []*types.Relay + err error + }{ + { + name: "Success", + relays: relays, + err: nil, + }, + } + for _, tt := range tests { + ts.Run(tt.name, func() { + ts.Equal(ts.driver.WriteRelays(context.Background(), tt.relays), tt.err) + }) + } +} + +func (ts *PGDriverTestSuite) TestPostgresDriver_ReadRelay() { + tests := []struct { + name string + relayID int + expRelay types.Relay + err error + }{ + { + name: "Success", + relayID: ts.firstRelay.RelayID, + expRelay: ts.firstRelay, + err: nil, + }, + } + for _, tt := range tests { + ts.Run(tt.name, func() { + relay, err := ts.driver.ReadRelay(context.Background(), tt.relayID) + ts.Equal(err, tt.err) + ts.Equal(relay, tt.expRelay) + }) + } +} diff --git a/postgres-driver/service_record.go b/postgres-driver/service_record.go new file mode 100644 index 0000000..ad95a58 --- /dev/null +++ b/postgres-driver/service_record.go @@ -0,0 +1,96 @@ +package postgresdriver + +import ( + "context" + "time" + + "github.com/pokt-foundation/transaction-http-db/types" +) + +func (d *PostgresDriver) WriteServiceRecord(ctx context.Context, serviceRecord types.ServiceRecord) error { + createdAt := time.Now() + + _, err := d.InsertServiceRecords(ctx, []InsertServiceRecordsParams{ + { + NodePublicKey: serviceRecord.NodePublicKey, + PoktChainID: serviceRecord.PoktChainID, + SessionKey: serviceRecord.SessionKey, + RequestID: serviceRecord.RequestID, + PortalRegionName: serviceRecord.PortalRegionName, + Latency: serviceRecord.Latency, + Tickets: int32(serviceRecord.Tickets), + Result: serviceRecord.Result, + Available: serviceRecord.Available, + Successes: int32(serviceRecord.Successes), + Failures: int32(serviceRecord.Failures), + P90SuccessLatency: serviceRecord.P90SuccessLatency, + MedianSuccessLatency: serviceRecord.MedianSuccessLatency, + WeightedSuccessLatency: serviceRecord.WeightedSuccessLatency, + SuccessRate: serviceRecord.SuccessRate, + CreatedAt: newTimestamp(createdAt), + UpdatedAt: newTimestamp(createdAt), + }, + }) + + return err +} + +func (d *PostgresDriver) WriteServiceRecords(ctx context.Context, serviceRecords []*types.ServiceRecord) error { + createdAt := time.Now() + + serviceRecordParams := make([]InsertServiceRecordsParams, 0, len(serviceRecords)) + + for _, serviceRecord := range serviceRecords { + serviceRecordParams = append(serviceRecordParams, InsertServiceRecordsParams{ + NodePublicKey: serviceRecord.NodePublicKey, + PoktChainID: serviceRecord.PoktChainID, + SessionKey: serviceRecord.SessionKey, + RequestID: serviceRecord.RequestID, + PortalRegionName: serviceRecord.PortalRegionName, + Latency: serviceRecord.Latency, + Tickets: int32(serviceRecord.Tickets), + Result: serviceRecord.Result, + Available: serviceRecord.Available, + Successes: int32(serviceRecord.Successes), + Failures: int32(serviceRecord.Failures), + P90SuccessLatency: serviceRecord.P90SuccessLatency, + MedianSuccessLatency: serviceRecord.MedianSuccessLatency, + WeightedSuccessLatency: serviceRecord.WeightedSuccessLatency, + SuccessRate: serviceRecord.SuccessRate, + CreatedAt: newTimestamp(createdAt), + UpdatedAt: newTimestamp(createdAt), + }) + } + + _, err := d.InsertServiceRecords(ctx, serviceRecordParams) + + return err +} + +func (d *PostgresDriver) ReadServiceRecord(ctx context.Context, serviceRecordID int) (types.ServiceRecord, error) { + serviceRecord, err := d.SelectServiceRecord(ctx, int64(serviceRecordID)) + if err != nil { + return types.ServiceRecord{}, err + } + + return types.ServiceRecord{ + NodePublicKey: serviceRecord.NodePublicKey, + PoktChainID: serviceRecord.PoktChainID, + ServiceRecordID: int(serviceRecord.ID), + SessionKey: serviceRecord.SessionKey, + RequestID: serviceRecord.RequestID, + PortalRegionName: serviceRecord.PortalRegionName, + Latency: serviceRecord.Latency, + Tickets: int(serviceRecord.Tickets), + Result: serviceRecord.Result, + Available: serviceRecord.Available, + Successes: int(serviceRecord.Successes), + Failures: int(serviceRecord.Failures), + P90SuccessLatency: serviceRecord.P90SuccessLatency, + MedianSuccessLatency: serviceRecord.MedianSuccessLatency, + WeightedSuccessLatency: serviceRecord.WeightedSuccessLatency, + SuccessRate: serviceRecord.SuccessRate, + CreatedAt: serviceRecord.CreatedAt.Time, + UpdatedAt: serviceRecord.UpdatedAt.Time, + }, nil +} diff --git a/postgres-driver/service_record_test.go b/postgres-driver/service_record_test.go new file mode 100644 index 0000000..208ae4e --- /dev/null +++ b/postgres-driver/service_record_test.go @@ -0,0 +1,99 @@ +package postgresdriver + +import ( + "context" + + "github.com/pokt-foundation/transaction-http-db/types" +) + +func (ts *PGDriverTestSuite) TestPostgresDriver_WriteServiceRecord() { + tests := []struct { + name string + serviceRecord types.ServiceRecord + err error + }{ + { + name: "Success", + serviceRecord: types.ServiceRecord{ + NodePublicKey: "21", + PoktChainID: "21", + SessionKey: ts.firstServiceRecord.SessionKey, + RequestID: "21", + PortalRegionName: ts.firstServiceRecord.PortalRegionName, + Latency: 21.07, + Tickets: 2, + Result: "a", + Available: true, + Successes: 21, + Failures: 7, + P90SuccessLatency: 21.07, + MedianSuccessLatency: 21.07, + WeightedSuccessLatency: 21.07, + SuccessRate: 21, + }, + err: nil, + }, + } + for _, tt := range tests { + ts.Equal(ts.driver.WriteServiceRecord(context.Background(), tt.serviceRecord), tt.err) + } +} + +func (ts *PGDriverTestSuite) TestPostgresDriver_WriteServiceRecords() { + var serviceRecords []*types.ServiceRecord + for i := 0; i < 1000; i++ { + serviceRecords = append(serviceRecords, &types.ServiceRecord{ + NodePublicKey: "21", + PoktChainID: "21", + SessionKey: ts.firstServiceRecord.SessionKey, + RequestID: "21", + PortalRegionName: ts.firstServiceRecord.PortalRegionName, + Latency: 21.07, + Tickets: 2, + Result: "a", + Available: true, + Successes: 21, + Failures: 7, + P90SuccessLatency: 21.07, + MedianSuccessLatency: 21.07, + WeightedSuccessLatency: 21.07, + SuccessRate: 21, + }) + } + + tests := []struct { + name string + serviceRecords []*types.ServiceRecord + err error + }{ + { + name: "Success", + serviceRecords: serviceRecords, + err: nil, + }, + } + for _, tt := range tests { + ts.Equal(ts.driver.WriteServiceRecords(context.Background(), tt.serviceRecords), tt.err) + } +} + +func (ts *PGDriverTestSuite) TestPostgresDriver_ReadServiceRecord() { + tests := []struct { + name string + serviceRecordID int + expServiceRecord types.ServiceRecord + err error + }{ + { + name: "Success", + serviceRecordID: ts.firstServiceRecord.ServiceRecordID, + expServiceRecord: ts.firstServiceRecord, + err: nil, + }, + } + for _, tt := range tests { + serviceRecord, err := ts.driver.ReadServiceRecord(context.Background(), tt.serviceRecordID) + ts.Equal(err, tt.err) + ts.Equal(serviceRecord, tt.expServiceRecord) + } +} diff --git a/postgres-driver/sqlc/query.sql b/postgres-driver/sqlc/query.sql new file mode 100644 index 0000000..466c9f6 --- /dev/null +++ b/postgres-driver/sqlc/query.sql @@ -0,0 +1,180 @@ +-- name: InsertRelays :copyfrom +INSERT INTO relay ( + pokt_chain_id, + endpoint_id, + session_key, + protocol_app_public_key, + relay_source_url, + pokt_node_address, + pokt_node_domain, + pokt_node_public_key, + relay_start_datetime, + relay_return_datetime, + is_error, + error_code, + error_name, + error_message, + error_source, + error_type, + relay_roundtrip_time, + relay_chain_method_ids, + relay_data_size, + relay_portal_trip_time, + relay_node_trip_time, + relay_url_is_public_endpoint, + portal_region_name, + is_altruist_relay, + is_user_relay, + request_id, + pokt_tx_id, + gigastake_app_id, + created_at, + updated_at, + blocking_plugin + ) +VALUES ( + $1, + $2, + $3, + $4, + $5, + $6, + $7, + $8, + $9, + $10, + $11, + $12, + $13, + $14, + $15, + $16, + $17, + $18, + $19, + $20, + $21, + $22, + $23, + $24, + $25, + $26, + $27, + $28, + $29, + $30, + $31 + ); +-- name: InsertServiceRecords :copyfrom +INSERT INTO service_record ( + node_public_key, + pokt_chain_id, + session_key, + request_id, + portal_region_name, + latency, + tickets, + result, + available, + successes, + failures, + p90_success_latency, + median_success_latency, + weighted_success_latency, + success_rate, + created_at, + updated_at + ) +VALUES ( + $1, + $2, + $3, + $4, + $5, + $6, + $7, + $8, + $9, + $10, + $11, + $12, + $13, + $14, + $15, + $16, + $17 + ); +-- name: InsertPocketSession :exec +INSERT INTO pocket_session ( + session_key, + session_height, + portal_region_name, + created_at, + updated_at + ) +VALUES ($1, $2, $3, $4, $5); +-- name: InsertPortalRegion :exec +INSERT INTO portal_region (portal_region_name) +VALUES ($1); +-- name: SelectRelay :one +SELECT r.id, + r.pokt_chain_id, + r.endpoint_id, + r.session_key, + r.protocol_app_public_key, + r.relay_source_url, + r.pokt_node_address, + r.pokt_node_domain, + r.pokt_node_public_key, + r.relay_start_datetime, + r.relay_return_datetime, + r.is_error, + r.error_code, + r.error_name, + r.error_message, + r.error_source, + r.error_type, + r.relay_roundtrip_time, + r.relay_chain_method_ids, + r.relay_data_size, + r.relay_portal_trip_time, + r.relay_node_trip_time, + r.relay_url_is_public_endpoint, + r.is_altruist_relay, + r.is_user_relay, + r.request_id, + r.pokt_tx_id, + r.gigastake_app_id, + r.created_at, + r.updated_at, + r.blocking_plugin, + ps.session_key, + ps.session_height, + ps.created_at, + ps.updated_at, + pr.portal_region_name +FROM relay r + INNER JOIN pocket_session ps ON ps.session_key = r.session_key + INNER JOIN portal_region pr ON pr.portal_region_name = r.portal_region_name +WHERE r.id = $1; +-- name: SelectServiceRecord :one +SELECT id, + node_public_key, + pokt_chain_id, + session_key, + request_id, + portal_region_name, + latency, + tickets, + result, + available, + successes, + failures, + p90_success_latency, + median_success_latency, + weighted_success_latency, + success_rate, + created_at, + updated_at +FROM service_record +WHERE id = $1; diff --git a/postgres-driver/sqlc/schema.sql b/postgres-driver/sqlc/schema.sql new file mode 100644 index 0000000..07d4af5 --- /dev/null +++ b/postgres-driver/sqlc/schema.sql @@ -0,0 +1,112 @@ +CREATE TYPE error_sources_enum AS ENUM ('internal', 'external'); +CREATE TABLE pocket_session ( + id bigint NOT NULL GENERATED ALWAYS AS IDENTITY, + session_key char(44) NOT NULL UNIQUE, + session_height integer NOT NULL, + portal_region_name varchar NOT NULL, + created_at timestamp NOT NULL, + updated_at timestamp NOT NULL, + CONSTRAINT pk_tbl_0 PRIMARY KEY (id, portal_region_name) +); +CREATE TABLE portal_region ( + portal_region_name varchar NOT NULL, + CONSTRAINT pk_portal_region PRIMARY KEY (portal_region_name) +); +CREATE TABLE relay ( + id bigint NOT NULL GENERATED ALWAYS AS IDENTITY, + pokt_chain_id char(4) NOT NULL, + endpoint_id varchar NOT NULL, + session_key char(44) NOT NULL, + protocol_app_public_key char(64) NOT NULL, + relay_source_url varchar, + pokt_node_address char(40), + pokt_node_domain varchar, + pokt_node_public_key char(64), + relay_start_datetime timestamp NOT NULL, + relay_return_datetime timestamp NOT NULL, + is_error boolean NOT NULL, + error_code integer, + error_name varchar, + error_message varchar, + error_source error_sources_enum, + error_type varchar, + relay_roundtrip_time float NOT NULL, + relay_chain_method_ids varchar NOT NULL, + relay_data_size integer NOT NULL, + relay_portal_trip_time float NOT NULL, + relay_node_trip_time float NOT NULL, + relay_url_is_public_endpoint boolean NOT NULL, + portal_region_name varchar NOT NULL, + is_altruist_relay boolean NOT NULL, + is_user_relay boolean NOT NULL, + request_id varchar NOT NULL, + pokt_tx_id varchar, + gigastake_app_id varchar, + created_at timestamp NOT NULL, + updated_at timestamp NOT NULL, + blocking_plugin varchar, + CONSTRAINT pk_relay PRIMARY KEY (id, portal_region_name) +); +CREATE TABLE service_record ( + id bigint NOT NULL GENERATED ALWAYS AS IDENTITY, + node_public_key char(64) NOT NULL, + pokt_chain_id char(4) NOT NULL, + session_key char(44) NOT NULL, + request_id varchar NOT NULL, + portal_region_name varchar NOT NULL, + latency float NOT NULL, + tickets integer NOT NULL, + result varchar NOT NULL, + available boolean NOT NULL, + successes integer NOT NULL, + failures integer NOT NULL, + p90_success_latency float NOT NULL, + median_success_latency float NOT NULL, + weighted_success_latency float NOT NULL, + success_rate float NOT NULL, + created_at timestamp NOT NULL, + updated_at timestamp NOT NULL, + CONSTRAINT pk_service_record PRIMARY KEY (id, portal_region_name) +); +ALTER TABLE relay +ADD CONSTRAINT fk_relay_portal_region FOREIGN KEY (portal_region_name) REFERENCES portal_region(portal_region_name); +ALTER TABLE relay +ADD CONSTRAINT fk_relay_session FOREIGN KEY (session_key) REFERENCES pocket_session(session_key); +ALTER TABLE service_record +ADD CONSTRAINT fk_service_region_portal_region FOREIGN KEY (portal_region_name) REFERENCES portal_region(portal_region_name); +ALTER TABLE service_record +ADD CONSTRAINT fk_service_record_session FOREIGN KEY (session_key) REFERENCES pocket_session(session_key); +ALTER TABLE pocket_session +ADD CONSTRAINT fk_pocket_session_portal_region FOREIGN KEY (portal_region_name) REFERENCES portal_region(portal_region_name); +INSERT INTO portal_region (portal_region_name) +VALUES ('europe-west3'), + ('europe-north1'), + ('europe-west8'), + ('europe-southwest1'), + ('europe-west2'), + ('europe-west9'), + ('us-east4'), + ('us-east5'), + ('us-west2'), + ('us-west1'), + ('northamerica-northeast2'), + ('asia-east2'), + ('asia-northeast1'), + ('asia-northeast3'), + ('asia-south1'), + ('asia-southeast1'), + ('australia-southeast1'); +INSERT INTO pocket_session ( + session_key, + session_height, + portal_region_name, + created_at, + updated_at + ) +VALUES ( + '', + 1, + 'europe-west3', + TIMESTAMP '1970-01-01 00:00:00', + TIMESTAMP '1970-01-01 00:00:00' + ); diff --git a/postgres-driver/sqlc/sqlc.yaml b/postgres-driver/sqlc/sqlc.yaml new file mode 100644 index 0000000..1f9aedd --- /dev/null +++ b/postgres-driver/sqlc/sqlc.yaml @@ -0,0 +1,17 @@ +version: "2" + +sql: + - schema: "schema.sql" + engine: "postgresql" + queries: "query.sql" + gen: + go: + package: "postgresdriver" + sql_package: "pgx/v5" + out: ".." + json_tags_case_style: camel + emit_json_tags: true + output_db_file_name: db.generated.go + output_models_file_name: models.generated.go + output_querier_file_name: querier.generated.go + output_files_suffix: .generated diff --git a/postgres-driver/suite_test.go b/postgres-driver/suite_test.go new file mode 100644 index 0000000..251eeb1 --- /dev/null +++ b/postgres-driver/suite_test.go @@ -0,0 +1,113 @@ +package postgresdriver + +import ( + "context" + "testing" + "time" + + "github.com/pokt-foundation/transaction-http-db/types" + + "github.com/stretchr/testify/suite" +) + +const ( + connectionString = "postgres://postgres:pgpassword@localhost:5432/postgres?sslmode=disable" // pragma: allowlist secret +) + +type PGDriverTestSuite struct { + suite.Suite + connectionString string + driver *PostgresDriver + + // Records inserted on setup for testing purposes + firstRelay types.Relay + firstServiceRecord types.ServiceRecord +} + +func Test_RunPGDriverSuite(t *testing.T) { + if testing.Short() { + t.Skip("skipping driver integration test") + } + + testSuite := new(PGDriverTestSuite) + testSuite.connectionString = connectionString + + suite.Run(t, testSuite) +} + +// SetupSuite runs before each test suite run +func (ts *PGDriverTestSuite) SetupSuite() { + ts.NoError(ts.initPostgresDriver()) + + ts.NoError(ts.driver.WriteRegion(context.Background(), types.PortalRegion{ + PortalRegionName: "La Colombia", + })) + + ts.NoError(ts.driver.WriteSession(context.Background(), types.PocketSession{ + SessionKey: "22", + SessionHeight: 22, + PortalRegionName: "La Colombia", + })) + + ts.NoError(ts.driver.WriteRelay(context.Background(), types.Relay{ + PoktChainID: "21", + EndpointID: "21", + SessionKey: "22", + ProtocolAppPublicKey: "21", + RelaySourceURL: "pablo.com", + PoktNodeAddress: "21", + PoktNodeDomain: "pablos.com", + PoktNodePublicKey: "aaa", + RelayStartDatetime: time.Now(), + RelayReturnDatetime: time.Now(), + RelayRoundtripTime: 1, + RelayChainMethodIDs: []string{"get_height"}, + RelayDataSize: 21, + RelayPortalTripTime: 21, + RelayNodeTripTime: 21, + RelayURLIsPublicEndpoint: false, + PortalRegionName: "La Colombia", + IsAltruistRelay: false, + IsUserRelay: false, + RequestID: "21", + })) + + ts.NoError(ts.driver.WriteServiceRecord(context.Background(), types.ServiceRecord{ + NodePublicKey: "21", + PoktChainID: "21", + SessionKey: "22", + RequestID: "21", + PortalRegionName: "La Colombia", + Latency: 21.07, + Tickets: 2, + Result: "a", + Available: true, + Successes: 21, + Failures: 7, + P90SuccessLatency: 21.07, + MedianSuccessLatency: 21.07, + WeightedSuccessLatency: 21.07, + SuccessRate: 21, + })) + + firstRelay, err := ts.driver.ReadRelay(context.Background(), 1) + ts.NoError(err) + + firstServiceRecord, err := ts.driver.ReadServiceRecord(context.Background(), 1) + ts.NoError(err) + + ts.firstRelay = firstRelay + ts.firstServiceRecord = firstServiceRecord +} + +// Initializes a real instance of the Postgres driver that connects to the test Postgres Docker container +func (ts *PGDriverTestSuite) initPostgresDriver() error { + driver, _, err := NewPostgresDriver(ts.connectionString) + if err != nil { + return err + } + + ts.driver = driver + + return nil +} diff --git a/router/mock_driver.go b/router/mock_driver.go index e96e2a4..ff27631 100644 --- a/router/mock_driver.go +++ b/router/mock_driver.go @@ -1,11 +1,11 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v2.32.3. DO NOT EDIT. package router import ( context "context" - types "github.com/pokt-foundation/transaction-db/types" + types "github.com/pokt-foundation/transaction-http-db/types" mock "github.com/stretchr/testify/mock" ) @@ -19,13 +19,16 @@ func (_m *MockDriver) ReadRelay(ctx context.Context, relayID int) (types.Relay, ret := _m.Called(ctx, relayID) var r0 types.Relay + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int) (types.Relay, error)); ok { + return rf(ctx, relayID) + } if rf, ok := ret.Get(0).(func(context.Context, int) types.Relay); ok { r0 = rf(ctx, relayID) } else { r0 = ret.Get(0).(types.Relay) } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, int) error); ok { r1 = rf(ctx, relayID) } else { @@ -40,13 +43,16 @@ func (_m *MockDriver) ReadServiceRecord(ctx context.Context, serviceRecordID int ret := _m.Called(ctx, serviceRecordID) var r0 types.ServiceRecord + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int) (types.ServiceRecord, error)); ok { + return rf(ctx, serviceRecordID) + } if rf, ok := ret.Get(0).(func(context.Context, int) types.ServiceRecord); ok { r0 = rf(ctx, serviceRecordID) } else { r0 = ret.Get(0).(types.ServiceRecord) } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, int) error); ok { r1 = rf(ctx, serviceRecordID) } else { @@ -111,3 +117,17 @@ func (_m *MockDriver) WriteSession(ctx context.Context, session types.PocketSess return r0 } + +// NewMockDriver creates a new instance of MockDriver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockDriver(t interface { + mock.TestingT + Cleanup(func()) +}) *MockDriver { + mock := &MockDriver{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/router/router.go b/router/router.go index 0a82692..463ff97 100644 --- a/router/router.go +++ b/router/router.go @@ -10,8 +10,8 @@ import ( "sync" "github.com/gorilla/mux" - "github.com/pokt-foundation/transaction-db/types" "github.com/pokt-foundation/transaction-http-db/batch" + "github.com/pokt-foundation/transaction-http-db/types" jsonresponse "github.com/pokt-foundation/utils-go/json-response" "go.uber.org/zap" "golang.org/x/sync/errgroup" diff --git a/router/router_test.go b/router/router_test.go index 966c2fd..95195b2 100644 --- a/router/router_test.go +++ b/router/router_test.go @@ -11,8 +11,8 @@ import ( "testing" "time" - "github.com/pokt-foundation/transaction-db/types" "github.com/pokt-foundation/transaction-http-db/batch" + "github.com/pokt-foundation/transaction-http-db/types" mock "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap" diff --git a/testdata/docker-compose.test.yml b/testdata/docker-compose.test.yml new file mode 100644 index 0000000..226125d --- /dev/null +++ b/testdata/docker-compose.test.yml @@ -0,0 +1,19 @@ +# This Dockerfile only used for setting up a local Postgres DB for testing +version: "3" + +services: + test-database: + build: + context: .. + dockerfile: ./Dockerfile.testdb + container_name: test-database + restart: always + ports: + - 5432:5432 + environment: + POSTGRES_PASSWORD: pgpassword + POSTGRES_DB: postgres + healthcheck: + test: pg_isready -U postgres + interval: 5s + retries: 3 diff --git a/types/pocket_session.go b/types/pocket_session.go new file mode 100644 index 0000000..1b3b05c --- /dev/null +++ b/types/pocket_session.go @@ -0,0 +1,60 @@ +package types + +import ( + "errors" + "fmt" + "reflect" + "time" +) + +var ( + // this fields shpould be empty because they are set after db record is created + shouldBeEmptySession = map[string]bool{ + "CreatedAt": true, + "UpdatedAt": true, + } + + ErrRepeatedSessionKey = errors.New("repeated session key") +) + +type PocketSession struct { + SessionKey string `json:"sessionKey"` + SessionHeight int `json:"sessionHeight"` + PortalRegionName string `json:"portalRegionName"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` +} + +func (ps *PocketSession) Validate() error { + structType := reflect.TypeOf(*ps) + structVal := reflect.ValueOf(*ps) + fieldNum := structVal.NumField() + + // fields are in the order they are declared on the struct + for i := 0; i < fieldNum; i++ { + field := structVal.Field(i) + fieldName := structType.Field(i).Name + + isSet := field.IsValid() && !field.IsZero() + + if isSet { + // shouldBeEmptyFields should never be set + if shouldBeEmptySession[fieldName] { + return fmt.Errorf("%s should not be set", fieldName) + } + } + + if !isSet { + // shouldBeEmptyField can be empty + // bools zero value is false which is a valid value + if shouldBeEmptySession[fieldName] { + continue + } + + // if is not set and the field is none of the special cases it is an error + return fmt.Errorf("%s is not set", fieldName) + } + } + + return nil +} diff --git a/types/pocket_session_test.go b/types/pocket_session_test.go new file mode 100644 index 0000000..eb12879 --- /dev/null +++ b/types/pocket_session_test.go @@ -0,0 +1,70 @@ +package types + +import ( + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestPocketSession_ValidateStruct(t *testing.T) { + c := require.New(t) + + tests := []struct { + name string + serviceRecord PocketSession + err error + }{ + { + name: "Success session", + serviceRecord: PocketSession{ + SessionKey: "key", + SessionHeight: 1, + PortalRegionName: "region", + }, + err: nil, + }, + { + name: "failure session no key", + serviceRecord: PocketSession{ + SessionKey: "", + SessionHeight: 1, + PortalRegionName: "region", + }, + err: errors.New("SessionKey is not set"), + }, + { + name: "failure session no height", + serviceRecord: PocketSession{ + SessionKey: "key", + SessionHeight: 0, + PortalRegionName: "region", + }, + err: errors.New("SessionHeight is not set"), + }, + { + name: "failure session no region", + serviceRecord: PocketSession{ + SessionKey: "key", + SessionHeight: 1, + PortalRegionName: "", + }, + err: errors.New("PortalRegionName is not set"), + }, + { + name: "failure session createdAt should not be set", + serviceRecord: PocketSession{ + SessionKey: "key", + SessionHeight: 1, + PortalRegionName: "region", + CreatedAt: time.Now(), + }, + err: errors.New("CreatedAt should not be set"), + }, + } + + for _, tt := range tests { + c.Equal(tt.err, tt.serviceRecord.Validate()) + } +} diff --git a/types/portal_region.go b/types/portal_region.go new file mode 100644 index 0000000..ae4d948 --- /dev/null +++ b/types/portal_region.go @@ -0,0 +1,5 @@ +package types + +type PortalRegion struct { + PortalRegionName string `json:"portalRegionName"` +} diff --git a/types/relay.go b/types/relay.go new file mode 100644 index 0000000..ba1d928 --- /dev/null +++ b/types/relay.go @@ -0,0 +1,55 @@ +package types + +import ( + "time" +) + +type ErrorSource string + +const ( + ErrorSourceInternal ErrorSource = "internal" + ErrorSourceExternal ErrorSource = "external" +) + +// TODO: consider removing this type and use portal types instead +type Relay struct { + RelayID int `json:"relayID"` + PoktChainID string `json:"poktChainID"` + EndpointID string `json:"endpointID"` + SessionKey string `json:"sessionKey"` + ProtocolAppPublicKey string `json:"protocolAppPublicKey"` + RelaySourceURL string `json:"relaySourceUrl"` + PoktNodeAddress string `json:"poktNodeAddress"` + PoktNodeDomain string `json:"poktNodeDomain"` + PoktNodePublicKey string `json:"poktNodePublicKey"` + RelayStartDatetime time.Time `json:"relayStartDatetime"` + RelayReturnDatetime time.Time `json:"relayReturnDatetime"` + IsError bool `json:"isError"` + ErrorCode int `json:"errorCode,omitempty"` + ErrorName string `json:"errorName,omitempty"` + ErrorMessage string `json:"errorMessage,omitempty"` + ErrorType string `json:"errorType,omitempty"` + ErrorSource ErrorSource `json:"errorSource,omitempty"` + RelayRoundtripTime float64 `json:"relayRoundtripTime"` + RelayChainMethodIDs []string `json:"relayChainMethodID"` + RelayDataSize int `json:"relayDataSize"` + RelayPortalTripTime float64 `json:"relayPortalTripTime"` + RelayNodeTripTime float64 `json:"relayNodeTripTime"` + RelayURLIsPublicEndpoint bool `json:"relayUrlIsPublicEndpoint"` + PortalRegionName string `json:"portalRegionName"` + IsAltruistRelay bool `json:"isAltruistRelay"` + IsUserRelay bool `json:"isUserRelay"` + RequestID string `json:"requestID"` + PoktTxID string `json:"poktTxID"` + GigastakeAppID string `json:"gigastakeAppID"` + Session PocketSession `json:"session"` + Region PortalRegion `json:"region"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` + BlockingPlugin string `json:"blockingPlugin"` +} + +func (r *Relay) Validate() (err error) { + // TODO: remove all calls to Validate from client side. Missing fields should be logged as warning, and fixed in the portal or txdb-reporter. + return nil +} diff --git a/types/service_record.go b/types/service_record.go new file mode 100644 index 0000000..93287ae --- /dev/null +++ b/types/service_record.go @@ -0,0 +1,80 @@ +package types + +import ( + "fmt" + "reflect" + "time" +) + +var ( + // this fields shpould be empty because they are set after db record is created + shouldBeEmptyServiceRecordField = map[string]bool{ + "ServiceRecordID": true, + "CreatedAt": true, + "UpdatedAt": true, + } + + serviceRecordOptionalFields = map[string]bool{ + "Latency": true, + "Result": true, + "Successes": true, + "Failures": true, + } +) + +type ServiceRecord struct { + ServiceRecordID int `json:"serviceRecordID"` + NodePublicKey string `json:"nodePublicKey"` + PoktChainID string `json:"poktChainID"` + SessionKey string `json:"sessionKey"` + RequestID string `json:"requestID"` + PortalRegionName string `json:"portalRegionName"` + Latency float64 `json:"latency"` + Tickets int `json:"tickets"` + Result string `json:"result"` + Available bool `json:"available"` + Successes int `json:"successes"` + Failures int `json:"failures"` + P90SuccessLatency float64 `json:"p90SuccessLatency"` + MedianSuccessLatency float64 `json:"medianSuccessLatency"` + WeightedSuccessLatency float64 `json:"weightedSuccessLatency"` + SuccessRate float64 `json:"successRate"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` +} + +func (sr *ServiceRecord) Validate() (err error) { + structType := reflect.TypeOf(*sr) + structVal := reflect.ValueOf(*sr) + fieldNum := structVal.NumField() + + // fields are in the order they are declared on the struct + for i := 0; i < fieldNum; i++ { + field := structVal.Field(i) + fieldName := structType.Field(i).Name + + isSet := field.IsValid() && !field.IsZero() + + if isSet { + // shouldBeEmptyFields should never be set + if shouldBeEmptyServiceRecordField[fieldName] { + return fmt.Errorf("%s should not be set", fieldName) + } + } + + if !isSet { + // shouldBeEmptyField can be empty + // bools zero value is false which is a valid value + if shouldBeEmptyServiceRecordField[fieldName] || + field.Kind() == reflect.Bool || + serviceRecordOptionalFields[fieldName] { + continue + } + + // if is not set and the field is none of the special cases it is an error + return fmt.Errorf("%s is not set", fieldName) + } + } + + return nil +} diff --git a/types/service_record_test.go b/types/service_record_test.go new file mode 100644 index 0000000..a7de01f --- /dev/null +++ b/types/service_record_test.go @@ -0,0 +1,103 @@ +package types + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestServiceRecord_ValidateStruct(t *testing.T) { + c := require.New(t) + + tests := []struct { + name string + serviceRecord ServiceRecord + err error + }{ + { + name: "Success service record", + serviceRecord: ServiceRecord{ + SessionKey: "21", + NodePublicKey: "21", + PoktChainID: "21", + RequestID: "21", + PortalRegionName: "La Colombia", + Latency: 21.07, + Tickets: 2, + Result: "a", + Available: true, + Successes: 21, + Failures: 7, + P90SuccessLatency: 21.07, + MedianSuccessLatency: 21.07, + WeightedSuccessLatency: 21.07, + SuccessRate: 21, + }, + err: nil, + }, + { + name: "Success service record without optional fields", + serviceRecord: ServiceRecord{ + SessionKey: "21", + NodePublicKey: "21", + PoktChainID: "21", + RequestID: "21", + PortalRegionName: "La Colombia", + Tickets: 2, + Available: true, + P90SuccessLatency: 21.07, + MedianSuccessLatency: 21.07, + WeightedSuccessLatency: 21.07, + SuccessRate: 21, + }, + err: nil, + }, + { + name: "Failure service record id set", + serviceRecord: ServiceRecord{ + ServiceRecordID: 21, + SessionKey: "21", + NodePublicKey: "21", + PoktChainID: "21", + RequestID: "21", + PortalRegionName: "La Colombia", + Latency: 21.07, + Tickets: 2, + Result: "a", + Available: true, + Successes: 21, + Failures: 7, + P90SuccessLatency: 21.07, + MedianSuccessLatency: 21.07, + WeightedSuccessLatency: 21.07, + SuccessRate: 21, + }, + err: errors.New("ServiceRecordID should not be set"), + }, + { + name: "Failure service record field not set", + serviceRecord: ServiceRecord{ + NodePublicKey: "21", + PoktChainID: "21", + RequestID: "21", + PortalRegionName: "La Colombia", + Latency: 21.07, + Tickets: 2, + Result: "a", + Available: true, + Successes: 21, + Failures: 7, + P90SuccessLatency: 21.07, + MedianSuccessLatency: 21.07, + WeightedSuccessLatency: 21.07, + SuccessRate: 21, + }, + err: errors.New("SessionKey is not set"), + }, + } + + for _, tt := range tests { + c.Equal(tt.err, tt.serviceRecord.Validate()) + } +}