77import maya
88import requests
99from influxdb import InfluxDBClient
10+ from influxdb_client import InfluxDBClient , Point
11+ from influxdb_client .client .write_api import SYNCHRONOUS
1012
1113
1214def retrieve_paginated_data (
@@ -31,7 +33,7 @@ def retrieve_paginated_data(
3133 return results
3234
3335
34- def store_series (connection , series , metrics , rate_data ):
36+ def store_series (connection , version , org , bucket , series , metrics , rate_data ):
3537
3638 agile_data = rate_data .get ('agile_unit_rates' , [])
3739 agile_rates = {
@@ -114,8 +116,10 @@ def tags_for_measurement(measurement):
114116 }
115117 for measurement in metrics
116118 ]
117- connection .write_points (measurements )
118-
119+ if (version == 2 ):
120+ connection .write (bucket , org , measurements )
121+ elif (version == 1 ):
122+ connection .write_points (measurements )
119123
120124@click .command ()
121125@click .option (
@@ -130,13 +134,28 @@ def cmd(config_file, from_date, to_date):
130134 config = ConfigParser ()
131135 config .read (config_file )
132136
133- influx = InfluxDBClient (
134- host = config .get ('influxdb' , 'host' , fallback = "localhost" ),
135- port = config .getint ('influxdb' , 'port' , fallback = 8086 ),
136- username = config .get ('influxdb' , 'user' , fallback = "" ),
137- password = config .get ('influxdb' , 'password' , fallback = "" ),
138- database = config .get ('influxdb' , 'database' , fallback = "energy" ),
139- )
137+ org = config .get ('influxdb' , 'org' , fallback = "" )
138+ database = config .get ('influxdb' , 'database' , fallback = "energy" )
139+ influx_version = (config .getint ('influxdb' , 'version' , fallback = 2 ))
140+
141+ if (influx_version == 2 ):
142+ influx = InfluxDBClient (
143+ url = config .get ('influxdb' , 'url' , fallback = "http://localhost:8086" ),
144+ token = config .get ('influxdb' , 'token' , fallback = "" ),
145+ org = org ,
146+ )
147+ write_api = influx .write_api (write_options = SYNCHRONOUS )
148+ elif (influx_version == 1 ):
149+ influx = InfluxDBClient (
150+ host = config .get ('influxdb' , 'host' , fallback = "localhost" ),
151+ port = config .getint ('influxdb' , 'port' , fallback = 8086 ),
152+ username = config .get ('influxdb' , 'user' , fallback = "" ),
153+ password = config .get ('influxdb' , 'password' , fallback = "" ),
154+ database = database ,
155+ )
156+ write_api = influx
157+ else :
158+ raise click .ClickException ('Influx version not supported' )
140159
141160 api_key = config .get ('octopus' , 'api_key' )
142161 if not api_key :
@@ -214,7 +233,7 @@ def cmd(config_file, from_date, to_date):
214233 api_key , agile_url , from_iso , to_iso
215234 )
216235 click .echo (f' { len (rate_data ["electricity" ]["agile_unit_rates" ])} rates.' )
217- store_series (influx , 'electricity' , e_consumption , rate_data ['electricity' ])
236+ store_series (write_api , influx_version , org , database , 'electricity' , e_consumption , rate_data ['electricity' ])
218237
219238 click .echo (
220239 f'Retrieving gas data for { from_iso } until { to_iso } ...' ,
@@ -224,8 +243,8 @@ def cmd(config_file, from_date, to_date):
224243 api_key , g_url , from_iso , to_iso
225244 )
226245 click .echo (f' { len (g_consumption )} readings.' )
227- store_series (influx , 'gas' , g_consumption , rate_data ['gas' ])
246+ store_series (write_api , influx_version , org , database , 'gas' , g_consumption , rate_data ['gas' ])
228247
229248
230249if __name__ == '__main__' :
231- cmd ()
250+ cmd ()
0 commit comments