Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 19 additions & 23 deletions internal/brokers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ func NewManager(ctx context.Context, brokersConfPath string, configuredBrokers [
}()

// Connect to the system bus
// Don't call dbus.SystemBus which caches globally system dbus (issues in tests)
bus, err := dbus.ConnectSystemBus()
if err != nil {
return m, err
Expand Down Expand Up @@ -119,6 +118,7 @@ func NewManager(ctx context.Context, brokersConfPath string, configuredBrokers [

// AvailableBrokers returns currently loaded and available brokers in preference order.
func (m *Manager) AvailableBrokers() (r []*Broker) {
r = make([]*Broker, 0, len(m.brokersOrder))
for _, id := range m.brokersOrder {
r = append(r, m.brokers[id])
}
Expand All @@ -145,7 +145,7 @@ func (m *Manager) SetDefaultBrokerForUser(brokerID, username string) error {
func (m *Manager) BrokerForUser(username string) (broker *Broker) {
m.usersToBrokerMu.RLock()
defer m.usersToBrokerMu.RUnlock()
return m.usersToBroker[username]
return m.usersToBroker[strings.ToLower(username)]
}

// BrokerFromSessionID returns broker currently in use for a given transaction sessionID.
Expand All @@ -160,55 +160,52 @@ func (m *Manager) BrokerFromSessionID(id string) (broker *Broker, err error) {

broker, exists := m.transactionsToBroker[id]
if !exists {
return nil, fmt.Errorf("no broker found for session %q", id)
return nil, fmt.Errorf("no broker found for session ID %q", id)
}

return broker, nil
}

// NewSession create a new session for the broker and store the sesssionID on the manager.
func (m *Manager) NewSession(brokerID, username, lang, mode string) (sessionID string, encryptionKey string, err error) {
// NewSession create a new session for the broker and store the sessionID on the manager.
func (m *Manager) NewSession(ctx context.Context, brokerID, username, lang, mode string) (sessionID string, encryptionKey string, err error) {
broker, err := m.brokerFromID(brokerID)
if err != nil {
return "", "", fmt.Errorf("invalid broker: %v", err)
}

sessionID, encryptionKey, err = broker.newSession(context.Background(), username, lang, mode)
sessionID, encryptionKey, err = broker.newSession(ctx, username, lang, mode)
if err != nil {
return "", "", err
}

m.transactionsToBrokerMu.Lock()
defer m.transactionsToBrokerMu.Unlock()
log.Debugf(context.Background(), "%s: New %s session for %q",
sessionID, mode, username)
log.Debugf(ctx, "%s: New %s session for %q", sessionID, mode, username)
m.transactionsToBroker[sessionID] = broker
return sessionID, encryptionKey, nil
}

// EndSession signals the end of the session to the broker associated with the sessionID and then removes the
// session -> broker mapping.
func (m *Manager) EndSession(sessionID string) error {
b, err := m.BrokerFromSessionID(sessionID)
if err != nil {
return err
func (m *Manager) EndSession(ctx context.Context, sessionID string) error {
m.transactionsToBrokerMu.Lock()
broker, exists := m.transactionsToBroker[sessionID]
if !exists {
m.transactionsToBrokerMu.Unlock()
return fmt.Errorf("no broker found for session ID %q", sessionID)
}
delete(m.transactionsToBroker, sessionID)
m.transactionsToBrokerMu.Unlock()

if err = b.endSession(context.Background(), sessionID); err != nil {
if err := broker.endSession(ctx, sessionID); err != nil {
return err
}

m.transactionsToBrokerMu.Lock()
log.Debugf(context.Background(), "%s: End session %q",
sessionID, m.transactionsToBroker[sessionID].Name)
delete(m.transactionsToBroker, sessionID)
m.transactionsToBrokerMu.Unlock()
log.Debugf(ctx, "%s: Ended session", sessionID)
return nil
}

// BrokerExists returns true if the brokerID is known by the manager. It can
// happen that a broker which was stored in the database is not available anymore
// because the user removed the configuration file.
// BrokerExists returns true if the brokerID is known by the manager.
func (m *Manager) BrokerExists(brokerID string) bool {
_, exists := m.brokers[brokerID]
return exists
Expand All @@ -218,8 +215,7 @@ func (m *Manager) BrokerExists(brokerID string) bool {
func (m *Manager) brokerFromID(id string) (broker *Broker, err error) {
broker, exists := m.brokers[id]
if !exists {
return nil, fmt.Errorf("no broker found matching %q", id)
return nil, fmt.Errorf("no broker found matching ID %q", id)
}

return broker, nil
}
Loading