Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions internal/proto/storage.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 17 additions & 18 deletions internal/server/spanner/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
package spanner

import (
"encoding/json"
"encoding/base64"
"fmt"

"google.golang.org/protobuf/types/known/structpb"
pb "github.com/datacommonsorg/mixer/internal/proto"
"google.golang.org/protobuf/proto"
)

// Property struct represents a subset of a row in the Edge table.
Expand Down Expand Up @@ -65,25 +66,23 @@ type TimeSeries []*DateValue
// DecodeSpanner decodes the observations field to a TimeSeries value.
// This is inherited from the spanner Decoder interface to decode from a spanner type to a custom type.
// Reference: https://cloud.google.com/go/docs/reference/cloud.google.com/go/spanner/latest#cloud_google_com_go_spanner_Decoder
// Note that the undecoded values are of type ListValue and each element a string value.
// Note that the undecoded value is a base64 encoded string.
func (ts *TimeSeries) DecodeSpanner(val interface{}) (err error) {
listVal, ok := val.(*structpb.ListValue)
if !ok {
return fmt.Errorf("failed to decode TimeSeries: (%v)", val)
obs := &pb.Observations{}
decodedVal, err := base64.StdEncoding.DecodeString(val.(string))
if err != nil {
return fmt.Errorf("failed to decode base64 encoded string: (%v)", err)
}
err = proto.Unmarshal(decodedVal, obs)
if err != nil {
return fmt.Errorf("failed to decode Observations: (%v)", err)
}
*ts = []*DateValue{}
for _, v := range listVal.Values {
var data map[string]string
err := json.Unmarshal([]byte(v.GetStringValue()), &data)
if err != nil {
return fmt.Errorf("failed to decode TimeSeries value: (%v)", v)
}
for date, strVal := range data {
*ts = append(*ts, &DateValue{
Date: date,
Value: strVal,
})
}
for date, value := range obs.Values {
*ts = append(*ts, &DateValue{
Date: date,
Value: value,
})
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/server/spanner/statements.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ var statements = struct {
variable_measured,
observation_about,
%s,
provenance,
provenance_url AS provenance,
COALESCE(observation_period, '') AS observation_period,
COALESCE(measurement_method, '') AS measurement_method,
COALESCE(unit, '') AS unit,
Expand Down
39 changes: 39 additions & 0 deletions internal/server/spanner/temp/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package main

import (
"context"
"fmt"
"log"

"github.com/datacommonsorg/mixer/internal/server/spanner"
)

const spannerInfoYaml = `
project: datcom-store
instance: dc-kg-test
database: dc_graph_stable
`

// This is a temporary program to test proto fields in spanner.
// Usage: go run internal/server/spanner/temp/main.go

func main() {
ctx := context.Background()
client, err := spanner.NewSpannerClient(ctx, spannerInfoYaml)
if err != nil {
log.Fatalf("Failed to create SpannerClient: %v", err)
}
variables := []string{"AirPollutant_Cancer_Risk"}
entities := []string{"geoId/01001", "geoId/02013"}
obs, err := client.GetObservations(ctx, variables, entities, "", false)
if err != nil {
log.Fatalf("Failed to get observations: %v", err)
}
for _, o := range obs {
fmt.Printf("Observations for %s %s:\n", o.VariableMeasured, o.ObservationAbout)
for _, o2 := range o.Observations {
fmt.Printf(" %v %v\n", o2.Date, o2.Value)
}

}
}
14 changes: 14 additions & 0 deletions proto/storage.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";

package datacommons;

option go_package = "github.com/datacommonsorg/mixer/internal/proto";

// Includes protos that are used in spanner storage.

// Observations represent the observations (time series) stored in the observations column in the Observation table.
message Observations {
// Map from date to value.
// Examples: "2024" -> "123", "2025-05" -> "-456.78"
map<string, string> values = 1;
}