|
| 1 | +# Views to transform marketplace data in pipeline |
| 2 | + |
| 3 | +import os |
| 4 | + |
| 5 | +from snowflake.core import Root, CreateMode |
| 6 | +from snowflake.snowpark import Session |
| 7 | +from snowflake.core.user_defined_function import ( |
| 8 | + Argument, |
| 9 | + ReturnDataType, |
| 10 | + PythonFunction, |
| 11 | + UserDefinedFunction, |
| 12 | +) |
| 13 | +from snowflake.core.view import View, ViewColumn |
| 14 | + |
| 15 | + |
| 16 | +""" |
| 17 | +To join the flight and location focused tables |
| 18 | +we need to cross the gap between the airport and cities domains. |
| 19 | +For this we make use of a Snowpark Python UDF. |
| 20 | +What's really cool is that Snowpark allows us to define a vectorized UDF |
| 21 | +making the processing super efficient as we don’t have to invoke the |
| 22 | +function on each row individually! |
| 23 | +
|
| 24 | +To compute the mapping between airports and cities, |
| 25 | +we use SnowflakeFile to read a JSON list from the pyairports package. |
| 26 | +The SnowflakeFile class provides dynamic file access, to stream files of any size. |
| 27 | +""" |
| 28 | +map_city_to_airport = UserDefinedFunction( |
| 29 | + name="get_city_for_airport", |
| 30 | + arguments=[Argument(name="iata", datatype="VARCHAR")], |
| 31 | + return_type=ReturnDataType(datatype="VARCHAR"), |
| 32 | + language_config=PythonFunction( |
| 33 | + runtime_version="3.11", packages=["snowflake-snowpark-python"], handler="main" |
| 34 | + ), |
| 35 | + body=""" |
| 36 | +from snowflake.snowpark.files import SnowflakeFile |
| 37 | +from _snowflake import vectorized |
| 38 | +import pandas |
| 39 | +import json |
| 40 | +
|
| 41 | +@vectorized(input=pandas.DataFrame) |
| 42 | +def main(df): |
| 43 | + airport_list = json.loads( |
| 44 | + SnowflakeFile.open("@bronze.raw/airport_list.json", 'r', require_scoped_url = False).read() |
| 45 | + ) |
| 46 | + airports = {airport[3]: airport[1] for airport in airport_list} |
| 47 | + return df[0].apply(lambda iata: airports.get(iata.upper())) |
| 48 | +""", |
| 49 | +) |
| 50 | + |
| 51 | + |
| 52 | +""" |
| 53 | +To mangle the data into a more usable form, |
| 54 | +we make use of views to not materialize the marketplace data |
| 55 | +and avoid the corresponding storage costs. |
| 56 | +""" |
| 57 | + |
| 58 | +pipeline = [ |
| 59 | + # We are interested in the per seat carbon emissions. |
| 60 | + # To obtain these, we need to divide the emission data by the number of seats in the airplane. |
| 61 | + View( |
| 62 | + name="flight_emissions", |
| 63 | + columns=[ |
| 64 | + ViewColumn(name="departure_airport"), |
| 65 | + ViewColumn(name="arrival_airport"), |
| 66 | + ViewColumn(name="co2_emissions_kg_per_person"), |
| 67 | + ], |
| 68 | + query=""" |
| 69 | + select |
| 70 | + departure_airport, |
| 71 | + arrival_airport, |
| 72 | + avg(estimated_co2_total_tonnes / seats) * 1000 as co2_emissions_kg_per_person |
| 73 | + from oag_flight_emissions_data_sample.public.estimated_emissions_schedules_sample |
| 74 | + where seats != 0 and estimated_co2_total_tonnes is not null |
| 75 | + group by departure_airport, arrival_airport |
| 76 | + """, |
| 77 | + ), |
| 78 | + # To avoid unreliable flight connections, we compute the fraction of flights that arrive |
| 79 | + # early or on time from the flight status data provided by OAG. |
| 80 | + View( |
| 81 | + name="flight_punctuality", |
| 82 | + columns=[ |
| 83 | + ViewColumn(name="departure_iata_airport_code"), |
| 84 | + ViewColumn(name="arrival_iata_airport_code"), |
| 85 | + ViewColumn(name="punctual_pct"), |
| 86 | + ], |
| 87 | + query=""" |
| 88 | + select |
| 89 | + departure_iata_airport_code, |
| 90 | + arrival_iata_airport_code, |
| 91 | + count( |
| 92 | + case when arrival_actual_ingate_timeliness IN ('OnTime', 'Early') THEN 1 END |
| 93 | + ) / COUNT(*) * 100 as punctual_pct |
| 94 | + from oag_flight_status_data_sample.public.flight_status_latest_sample |
| 95 | + where arrival_actual_ingate_timeliness is not null |
| 96 | + group by departure_iata_airport_code, arrival_iata_airport_code |
| 97 | + """, |
| 98 | + ), |
| 99 | + # When joining the flight emissions with the punctuality view, |
| 100 | + # we filter for flights starting from the airport closest to where we live. |
| 101 | + # This information is provided in the tiny JSON file data/home.json which we query directly in the view. |
| 102 | + View( |
| 103 | + name="flights_from_home", |
| 104 | + columns=[ |
| 105 | + ViewColumn(name="departure_airport"), |
| 106 | + ViewColumn(name="arrival_airport"), |
| 107 | + ViewColumn(name="arrival_city"), |
| 108 | + ViewColumn(name="co2_emissions_kg_per_person"), |
| 109 | + ViewColumn(name="punctual_pct"), |
| 110 | + ], |
| 111 | + query=""" |
| 112 | + select |
| 113 | + departure_airport, |
| 114 | + arrival_airport, |
| 115 | + get_city_for_airport(arrival_airport) arrival_city, |
| 116 | + co2_emissions_kg_per_person, |
| 117 | + punctual_pct, |
| 118 | + from flight_emissions |
| 119 | + join flight_punctuality |
| 120 | + on departure_airport = departure_iata_airport_code |
| 121 | + and arrival_airport = arrival_iata_airport_code |
| 122 | + where departure_airport = ( |
| 123 | + select $1:airport |
| 124 | + from @quickstart_common.public.quickstart_repo/branches/main/data/home.json |
| 125 | + (FILE_FORMAT => bronze.json_format)) |
| 126 | + """, |
| 127 | + ), |
| 128 | + # Weather Source provides a weather forecast for the upcoming two weeks. |
| 129 | + # As the free versions of the data sets we use do not cover the entire globe, |
| 130 | + # we limit our pipeline to zip codes inside the US and compute the average |
| 131 | + # temperature, humidity, precipitation probability and cloud coverage. |
| 132 | + View( |
| 133 | + name="weather_forecast", |
| 134 | + columns=[ |
| 135 | + ViewColumn(name="postal_code"), |
| 136 | + ViewColumn(name="avg_temperature_air_f"), |
| 137 | + ViewColumn(name="avg_relative_humidity_pct"), |
| 138 | + ViewColumn(name="avg_cloud_cover_pct"), |
| 139 | + ViewColumn(name="precipitation_probability_pct"), |
| 140 | + ], |
| 141 | + query=""" |
| 142 | + select |
| 143 | + postal_code, |
| 144 | + avg(avg_temperature_air_2m_f) avg_temperature_air_f, |
| 145 | + avg(avg_humidity_relative_2m_pct) avg_relative_humidity_pct, |
| 146 | + avg(avg_cloud_cover_tot_pct) avg_cloud_cover_pct, |
| 147 | + avg(probability_of_precipitation_pct) precipitation_probability_pct |
| 148 | + from global_weather__climate_data_for_bi.standard_tile.forecast_day |
| 149 | + where country = 'US' |
| 150 | + group by postal_code |
| 151 | + """, |
| 152 | + ), |
| 153 | + # We use the data provided by Cybersyn to limit our pipeline to US cities with atleast |
| 154 | + # 100k residents to enjoy all the benefits a big city provides during our vacation. |
| 155 | + View( |
| 156 | + name="major_us_cities", |
| 157 | + columns=[ |
| 158 | + ViewColumn(name="geo_id"), |
| 159 | + ViewColumn(name="geo_name"), |
| 160 | + ViewColumn(name="total_population"), |
| 161 | + ], |
| 162 | + query=""" |
| 163 | + select |
| 164 | + geo.geo_id, |
| 165 | + geo.geo_name, |
| 166 | + max(ts.value) total_population |
| 167 | + from global_government.cybersyn.datacommons_timeseries ts |
| 168 | + join global_government.cybersyn.geography_index geo |
| 169 | + on ts.geo_id = geo.geo_id |
| 170 | + join global_government.cybersyn.geography_relationships geo_rel |
| 171 | + on geo_rel.related_geo_id = geo.geo_id |
| 172 | + where true |
| 173 | + and ts.variable_name = 'Total Population, census.gov' |
| 174 | + and date >= '2020-01-01' |
| 175 | + and geo.level = 'City' |
| 176 | + and geo_rel.geo_id = 'country/USA' |
| 177 | + and value > 100000 |
| 178 | + group by geo.geo_id, geo.geo_name |
| 179 | + order by total_population desc |
| 180 | + """, |
| 181 | + ), |
| 182 | + # Using the geography relationships provided by Cybersyn we collect all the |
| 183 | + # zip codes belonging to a city. |
| 184 | + View( |
| 185 | + name="zip_codes_in_city", |
| 186 | + columns=[ |
| 187 | + ViewColumn(name="city_geo_id"), |
| 188 | + ViewColumn(name="city_geo_name"), |
| 189 | + ViewColumn(name="zip_geo_id"), |
| 190 | + ViewColumn(name="zip_geo_name"), |
| 191 | + ], |
| 192 | + query=""" |
| 193 | + select |
| 194 | + city.geo_id city_geo_id, |
| 195 | + city.geo_name city_geo_name, |
| 196 | + city.related_geo_id zip_geo_id, |
| 197 | + city.related_geo_name zip_geo_name |
| 198 | + from us_addresses__poi.cybersyn.geography_relationships country |
| 199 | + join us_addresses__poi.cybersyn.geography_relationships city |
| 200 | + on country.related_geo_id = city.geo_id |
| 201 | + where true |
| 202 | + and country.geo_id = 'country/USA' |
| 203 | + and city.level = 'City' |
| 204 | + and city.related_level = 'CensusZipCodeTabulationArea' |
| 205 | + order by city_geo_id |
| 206 | + """, |
| 207 | + ), |
| 208 | + View( |
| 209 | + name="weather_joined_with_major_cities", |
| 210 | + columns=[ |
| 211 | + ViewColumn(name="geo_id"), |
| 212 | + ViewColumn(name="geo_name"), |
| 213 | + ViewColumn(name="total_population"), |
| 214 | + ViewColumn(name="avg_temperature_air_f"), |
| 215 | + ViewColumn(name="avg_relative_humidity_pct"), |
| 216 | + ViewColumn(name="avg_cloud_cover_pct"), |
| 217 | + ViewColumn(name="precipitation_probability_pct"), |
| 218 | + ], |
| 219 | + query=""" |
| 220 | + select |
| 221 | + city.geo_id, |
| 222 | + city.geo_name, |
| 223 | + city.total_population, |
| 224 | + avg(avg_temperature_air_f) avg_temperature_air_f, |
| 225 | + avg(avg_relative_humidity_pct) avg_relative_humidity_pct, |
| 226 | + avg(avg_cloud_cover_pct) avg_cloud_cover_pct, |
| 227 | + avg(precipitation_probability_pct) precipitation_probability_pct |
| 228 | + from major_us_cities city |
| 229 | + join zip_codes_in_city zip on city.geo_id = zip.city_geo_id |
| 230 | + join weather_forecast weather on zip.zip_geo_name = weather.postal_code |
| 231 | + group by city.geo_id, city.geo_name, city.total_population |
| 232 | + """, |
| 233 | + ), |
| 234 | + # Placeholder: Add new view definition here |
| 235 | +] |
| 236 | + |
| 237 | + |
| 238 | +# entry point for PythonAPI |
| 239 | +root = Root(Session.builder.getOrCreate()) |
| 240 | + |
| 241 | +# create views in Snowflake |
| 242 | +silver_schema = root.databases["quickstart_prod"].schemas["silver"] |
| 243 | +silver_schema.user_defined_functions.create( |
| 244 | + map_city_to_airport, mode=CreateMode.or_replace |
| 245 | +) |
| 246 | +for view in pipeline: |
| 247 | + silver_schema.views.create(view, mode=CreateMode.or_replace) |
0 commit comments