Skip to content

Commit d40a7c3

Browse files
authored
support for GTFS Realtime (#2)
This adds basic support for GTFS Realtime.
1 parent 7708fba commit d40a7c3

File tree

10 files changed

+2115
-4
lines changed

10 files changed

+2115
-4
lines changed

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@ require (
1313
)
1414

1515
require (
16+
github.com/MobilityData/gtfs-realtime-bindings/golang/gtfs v1.0.0 // indirect
1617
github.com/davecgh/go-spew v1.1.1 // indirect
1718
github.com/inconshreveable/mousetrap v1.1.0 // indirect
1819
github.com/pmezard/go-difflib v1.0.0 // indirect
1920
github.com/spf13/pflag v1.0.5 // indirect
21+
google.golang.org/protobuf v1.31.0 // indirect
2022
gopkg.in/yaml.v3 v3.0.1 // indirect
2123
)

go.sum

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1+
github.com/MobilityData/gtfs-realtime-bindings/golang/gtfs v1.0.0 h1:f4P+fVYmSIWj4b/jvbMdmrmsx/Xb+5xCpYYtVXOdKoc=
2+
github.com/MobilityData/gtfs-realtime-bindings/golang/gtfs v1.0.0/go.mod h1:nSmbVVQSM4lp9gYvVaaTotnRxSwZXEdFnJARofg5V4g=
13
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
24
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
35
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
46
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
57
github.com/gocarina/gocsv v0.0.0-20230616125104-99d496ca653d h1:KbPOUXFUDJxwZ04vbmDOc3yuruGvVO+LOa7cVER3yWw=
68
github.com/gocarina/gocsv v0.0.0-20230616125104-99d496ca653d/go.mod h1:5YoVOkjYAQumqlV356Hj3xeYh4BdZuLE0/nRkf2NKkI=
9+
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
10+
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
711
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
812
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
913
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
@@ -25,6 +29,12 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
2529
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
2630
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
2731
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
32+
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
33+
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
34+
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
35+
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
36+
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
37+
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
2838
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
2939
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
3040
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

parse/realtime.go

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
package parse
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
gtfsproto "github.com/MobilityData/gtfs-realtime-bindings/golang/gtfs"
9+
proto "google.golang.org/protobuf/proto"
10+
)
11+
12+
type StopTimeUpdateScheduleRelationship int
13+
14+
const (
15+
StopTimeUpdateScheduled StopTimeUpdateScheduleRelationship = iota
16+
StopTimeUpdateSkipped
17+
StopTimeUpdateNoData
18+
)
19+
20+
type StopTimeUpdate struct {
21+
TripID string
22+
StopID string
23+
StopSequence uint32
24+
ArrivalIsSet bool
25+
ArrivalTime time.Time
26+
ArrivalDelay time.Duration
27+
DepartureIsSet bool
28+
DepartureTime time.Time
29+
DepartureDelay time.Duration
30+
Type StopTimeUpdateScheduleRelationship
31+
}
32+
33+
// Contains key data from a GTFS Realtime feed
34+
type Realtime struct {
35+
SkippedTrips map[string]bool
36+
Updates []*StopTimeUpdate
37+
38+
// These exist to simplify debugging down the road
39+
NumScheduledTrips int
40+
NumAddedTrips int
41+
NumUnscheduledTrips int
42+
NumCanceledTrips int
43+
NumDuplicatedTrips int
44+
}
45+
46+
func ParseRealtime(ctx context.Context, feeds [][]byte) (*Realtime, error) {
47+
rt := &Realtime{
48+
SkippedTrips: map[string]bool{},
49+
Updates: []*StopTimeUpdate{},
50+
}
51+
52+
for _, feed := range feeds {
53+
// Unmarshal proto
54+
f := &gtfsproto.FeedMessage{}
55+
err := proto.Unmarshal(feed, f)
56+
if err != nil {
57+
return nil, fmt.Errorf("unmarshaling protobuf: %w", err)
58+
}
59+
60+
// Validate header
61+
header := f.GetHeader()
62+
63+
version := header.GetGtfsRealtimeVersion()
64+
if version != "2.0" && version != "1.0" {
65+
return nil, fmt.Errorf("version %s not supported", version)
66+
}
67+
68+
if header.GetIncrementality() != gtfsproto.FeedHeader_FULL_DATASET {
69+
return nil, fmt.Errorf("feed incrementality %s not supported", header.GetIncrementality())
70+
}
71+
72+
// Process the feed entities
73+
err = processEntities(ctx, rt, f.GetEntity())
74+
if err != nil {
75+
return nil, fmt.Errorf("processing entities: %w", err)
76+
}
77+
}
78+
79+
return rt, nil
80+
}
81+
82+
func processEntities(ctx context.Context, rt *Realtime, entities []*gtfsproto.FeedEntity) error {
83+
for _, entity := range entities {
84+
// We only care about TripUpdates
85+
if entity.TripUpdate == nil {
86+
continue
87+
}
88+
89+
trip := entity.TripUpdate.Trip
90+
if trip == nil {
91+
return fmt.Errorf("trip_update missing trip")
92+
}
93+
94+
// Blank trip ID is allowed when (route_id,
95+
// direction_id, start_time, start_date) is provided
96+
// and uniquely identifies the trip in the static
97+
// schedule. Also allowed for frequency based trips.
98+
//
99+
// That said, we don't support it.
100+
if trip.GetTripId() == "" {
101+
continue
102+
}
103+
104+
switch sr := trip.GetScheduleRelationship(); sr {
105+
106+
case gtfsproto.TripDescriptor_SCHEDULED:
107+
// Trip running in accordance with GTFS schedule
108+
for _, update := range entity.TripUpdate.GetStopTimeUpdate() {
109+
err := processStopTimeUpdate(ctx, rt, trip.GetTripId(), update)
110+
if err != nil {
111+
return fmt.Errorf("processing stop time update: %w", err)
112+
}
113+
}
114+
rt.NumScheduledTrips++
115+
116+
case gtfsproto.TripDescriptor_ADDED:
117+
// An extra trip that's been added. Not supported!
118+
rt.NumAddedTrips++
119+
120+
case gtfsproto.TripDescriptor_UNSCHEDULED:
121+
// For frequency based trips only. Not supported!
122+
rt.NumUnscheduledTrips++
123+
124+
case gtfsproto.TripDescriptor_CANCELED:
125+
// Trip in GTFS schedule that has been canceled.
126+
rt.SkippedTrips[trip.GetTripId()] = true
127+
rt.NumCanceledTrips++
128+
129+
case gtfsproto.TripDescriptor_DUPLICATED:
130+
// Copy of a trip in GTFS schedule. Not supported!
131+
rt.NumDuplicatedTrips++
132+
133+
}
134+
}
135+
136+
return nil
137+
}
138+
139+
func processStopTimeUpdate(
140+
ctx context.Context,
141+
rt *Realtime,
142+
tripID string,
143+
update *gtfsproto.TripUpdate_StopTimeUpdate,
144+
) error {
145+
146+
var arrivalIsSet bool
147+
var arrivalTime time.Time
148+
var arrivalDelay time.Duration
149+
var departureIsSet bool
150+
var departureTime time.Time
151+
var departureDelay time.Duration
152+
153+
if update.Arrival != nil {
154+
arrivalIsSet = true
155+
arrivalUnix := int64(update.GetArrival().GetTime())
156+
if arrivalUnix != 0 {
157+
arrivalTime = time.Unix(arrivalUnix, 0).UTC()
158+
}
159+
arrivalDelay = time.Duration(update.GetArrival().GetDelay()) * time.Second
160+
}
161+
162+
if update.Departure != nil {
163+
departureIsSet = true
164+
departureUnix := int64(update.GetDeparture().GetTime())
165+
if departureUnix != 0 {
166+
departureTime = time.Unix(departureUnix, 0).UTC()
167+
}
168+
departureDelay = time.Duration(update.GetDeparture().GetDelay()) * time.Second
169+
}
170+
171+
stup := &StopTimeUpdate{
172+
TripID: tripID,
173+
StopID: update.GetStopId(),
174+
StopSequence: uint32(update.GetStopSequence()),
175+
ArrivalIsSet: arrivalIsSet,
176+
ArrivalTime: arrivalTime,
177+
ArrivalDelay: arrivalDelay,
178+
DepartureIsSet: departureIsSet,
179+
DepartureTime: departureTime,
180+
DepartureDelay: departureDelay,
181+
}
182+
183+
if stup.StopID == "" && stup.StopSequence == 0 {
184+
// XXX: StopSequence 0 is actually allowed by
185+
// spec. This may cause problems.
186+
return fmt.Errorf("stop_time_update missing stop_id and stop_sequence")
187+
}
188+
189+
switch sr := update.GetScheduleRelationship(); sr {
190+
191+
case gtfsproto.TripUpdate_StopTimeUpdate_SCHEDULED:
192+
// Vehicle will stop according to GTFS schedule, but
193+
// possibly with delay.
194+
stup.Type = StopTimeUpdateScheduled
195+
rt.Updates = append(rt.Updates, stup)
196+
197+
case gtfsproto.TripUpdate_StopTimeUpdate_SKIPPED:
198+
// Stop skipped
199+
stup.Type = StopTimeUpdateSkipped
200+
rt.Updates = append(rt.Updates, stup)
201+
202+
case gtfsproto.TripUpdate_StopTimeUpdate_NO_DATA:
203+
// No data for this stop
204+
stup.Type = StopTimeUpdateNoData
205+
rt.Updates = append(rt.Updates, stup)
206+
207+
case gtfsproto.TripUpdate_StopTimeUpdate_UNSCHEDULED:
208+
// For frequency based trips. Not supported!
209+
}
210+
211+
return nil
212+
}

0 commit comments

Comments
 (0)