@@ -36,9 +36,12 @@ import (
3636
3737 monitoring "cloud.google.com/go/monitoring/apiv3"
3838 "go.opencensus.io/stats/view"
39+ "go.opencensus.io/tag"
3940 "google.golang.org/api/option"
4041 "google.golang.org/api/support/bundler"
42+ metricpb "google.golang.org/genproto/googleapis/api/metric"
4143 mrpb "google.golang.org/genproto/googleapis/api/monitoredres"
44+ mpb "google.golang.org/genproto/googleapis/monitoring/v3"
4245)
4346
4447// Exporter is the exporter that can be registered to opencensus. An Exporter object must be
@@ -47,12 +50,7 @@ type Exporter struct {
4750 // TODO(lawrencechung): If possible, find a way to not storing ctx in the struct.
4851 ctx context.Context
4952 client * monitoring.MetricClient
50- opts * Options
51-
52- // copy of some option values which may be modified by exporter.
53- getProjectID func (* RowData ) (string , error )
54- onError func (error , ... * RowData )
55- makeResource func (* RowData ) (* mrpb.MonitoredResource , error )
53+ opts Options
5654
5755 // mu protects access to projDataMap
5856 mu sync.Mutex
@@ -67,9 +65,14 @@ type Options struct {
6765 // RPC calls.
6866 ClientOptions []option.ClientOption
6967
70- // options for bundles amortizing export requests. Note that a bundle is created for each
68+ // Options for bundles amortizing export requests. Note that a bundle is created for each
7169 // project. When not provided, default values in bundle package are used.
70+
71+ // BundleDelayThreshold determines the max amount of time the exporter can wait before
72+ // uploading data to the stackdriver.
7273 BundleDelayThreshold time.Duration
74+ // BundleCountThreshold determines how many RowData objects can be buffered before batch
75+ // uploading them to the backend.
7376 BundleCountThreshold int
7477
7578 // Callback functions provided by user.
@@ -85,8 +88,8 @@ type Options struct {
8588 GetProjectID func (* RowData ) (projectID string , err error )
8689 // OnError is used to report any error happened while exporting view data fails. Whenever
8790 // this function is called, it's guaranteed that at least one row data is also passed to
88- // OnError. Row data passed to OnError must not be modified. When OnError is not set, all
89- // errors happened on exporting are ignored.
91+ // OnError. Row data passed to OnError must not be modified and OnError must be
92+ // non-blocking. When OnError is not set, all errors happened on exporting are ignored.
9093 OnError func (error , ... * RowData )
9194 // MakeResource creates monitored resource from RowData. It is guaranteed that only RowData
9295 // that passes GetProjectID will be given to this function. Though not recommended, error
@@ -129,8 +132,8 @@ func defaultMakeResource(rd *RowData) (*mrpb.MonitoredResource, error) {
129132 return & mrpb.MonitoredResource {Type : "global" }, nil
130133}
131134
132- // Following functions are wrapper of functions that may show non-deterministic behavior . Only tests
133- // can modify these functions.
135+ // Following functions are wrapper of functions those will be mocked by tests . Only tests can modify
136+ // these functions.
134137var (
135138 newMetricClient = monitoring .NewMetricClient
136139 createTimeSeries = (* monitoring .MetricClient ).CreateTimeSeries
@@ -141,7 +144,7 @@ var (
141144// NewExporter creates an Exporter object. Once a call to NewExporter is made, any fields in opts
142145// must not be modified at all. ctx will also be used throughout entire exporter operation when
143146// making RPC call.
144- func NewExporter (ctx context.Context , opts * Options ) (* Exporter , error ) {
147+ func NewExporter (ctx context.Context , opts Options ) (* Exporter , error ) {
145148 client , err := newMetricClient (ctx , opts .ClientOptions ... )
146149 if err != nil {
147150 return nil , fmt .Errorf ("failed to create a metric client: %v" , err )
@@ -154,19 +157,14 @@ func NewExporter(ctx context.Context, opts *Options) (*Exporter, error) {
154157 projDataMap : make (map [string ]* projectData ),
155158 }
156159
157- // We don't want to modify user-supplied options, so save default options directly in
158- // exporter.
159- e .getProjectID = defaultGetProjectID
160- if opts .GetProjectID != nil {
161- e .getProjectID = opts .GetProjectID
160+ if e .opts .GetProjectID == nil {
161+ e .opts .GetProjectID = defaultGetProjectID
162162 }
163- e .onError = defaultOnError
164- if opts .OnError != nil {
165- e .onError = opts .OnError
163+ if e .opts .OnError == nil {
164+ e .opts .OnError = defaultOnError
166165 }
167- e .makeResource = defaultMakeResource
168- if opts .MakeResource != nil {
169- e .makeResource = opts .MakeResource
166+ if e .opts .MakeResource == nil {
167+ e .opts .MakeResource = defaultMakeResource
170168 }
171169
172170 return e , nil
@@ -201,12 +199,12 @@ var RowDataNotApplicableError = errors.New("row data is not applicable to the ex
201199
202200// exportRowData exports a single row data.
203201func (e * Exporter ) exportRowData (rd * RowData ) {
204- projID , err := e .getProjectID (rd )
202+ projID , err := e .opts . GetProjectID (rd )
205203 if err != nil {
206204 // We ignore non-applicable RowData.
207205 if err != RowDataNotApplicableError {
208206 newErr := fmt .Errorf ("failed to get project ID on row data with view %s: %v" , rd .View .Name , err )
209- e .onError (newErr , rd )
207+ e .opts . OnError (newErr , rd )
210208 }
211209 return
212210 }
@@ -217,7 +215,7 @@ func (e *Exporter) exportRowData(rd *RowData) {
217215 go pd .uploadRowData (rd )
218216 default :
219217 newErr := fmt .Errorf ("failed to add row data with view %s to bundle for project %s: %v" , rd .View .Name , projID , err )
220- e .onError (newErr , rd )
218+ e .opts . OnError (newErr , rd )
221219 }
222220}
223221
@@ -233,6 +231,23 @@ func (e *Exporter) getProjectData(projectID string) *projectData {
233231 return pd
234232}
235233
234+ func (e * Exporter ) newProjectData (projectID string ) * projectData {
235+ pd := & projectData {
236+ parent : e ,
237+ projectID : projectID ,
238+ }
239+
240+ pd .bndler = newBundler ((* RowData )(nil ), pd .uploadRowData )
241+ // Set options for bundler if they are provided by users.
242+ if 0 < e .opts .BundleDelayThreshold {
243+ pd .bndler .DelayThreshold = e .opts .BundleDelayThreshold
244+ }
245+ if 0 < e .opts .BundleCountThreshold {
246+ pd .bndler .BundleCountThreshold = e .opts .BundleCountThreshold
247+ }
248+ return pd
249+ }
250+
236251// Close flushes and closes the exporter. Close must be called after the exporter is unregistered
237252// and no further calls to ExportView() are made. Once Close() is returned no further access to the
238253// exporter is allowed in any way.
@@ -248,3 +263,42 @@ func (e *Exporter) Close() error {
248263 }
249264 return nil
250265}
266+
267+ // makeTS constructs a time series from a row data.
268+ func (e * Exporter ) makeTS (rd * RowData ) (* mpb.TimeSeries , error ) {
269+ pt := newPoint (rd .View , rd .Row , rd .Start , rd .End )
270+ if pt .Value == nil {
271+ return nil , fmt .Errorf ("inconsistent data found in view %s" , rd .View .Name )
272+ }
273+ resource , err := e .opts .MakeResource (rd )
274+ if err != nil {
275+ return nil , fmt .Errorf ("failed to construct resource of view %s: %v" , rd .View .Name , err )
276+ }
277+ ts := & mpb.TimeSeries {
278+ Metric : & metricpb.Metric {
279+ Type : rd .View .Name ,
280+ Labels : e .makeLabels (rd .Row .Tags ),
281+ },
282+ Resource : resource ,
283+ Points : []* mpb.Point {pt },
284+ }
285+ return ts , nil
286+ }
287+
288+ // makeLables constructs label that's ready for being uploaded to stackdriver.
289+ func (e * Exporter ) makeLabels (tags []tag.Tag ) map [string ]string {
290+ opts := e .opts
291+ labels := make (map [string ]string , len (opts .DefaultLabels )+ len (tags ))
292+ for key , val := range opts .DefaultLabels {
293+ labels [key ] = val
294+ }
295+ // If there's overlap When combining exporter's default label and tags, values in tags win.
296+ for _ , tag := range tags {
297+ labels [tag .Key .Name ()] = tag .Value
298+ }
299+ // Some labels are not for exporting.
300+ for _ , key := range opts .UnexportedLabels {
301+ delete (labels , key )
302+ }
303+ return labels
304+ }
0 commit comments