Skip to content

Commit 7b647be

Browse files
authored
fix: repair windows cni lock issue (Azure#1712)
* Moving the lock from InitializeKeyValueStore() function to restore/save functions to improve cni performance on windows. * fix: use defer function to unlock statefile. * fix: fixing the IPAM lock and defer func * fix: Optimizing cni file lock by moving SetSdnRemoteArpMacAddress() on startup for CRD and MultitenantCRD mode. * adding store lock on telemetry service start to avoid race condition on windows.
1 parent abd0772 commit 7b647be

File tree

7 files changed

+147
-29
lines changed

7 files changed

+147
-29
lines changed

cni/network/plugin/main.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,9 +225,10 @@ func rootExecute() error {
225225
// Start telemetry process if not already started. This should be done inside lock, otherwise multiple process
226226
// end up creating/killing telemetry process results in undesired state.
227227
tb = telemetry.NewTelemetryBuffer()
228-
tb.ConnectToTelemetryService(telemetryNumRetries, telemetryWaitTimeInMilliseconds)
228+
if err = tb.ConnectCNIToTelemetryService(telemetryNumRetries, telemetryWaitTimeInMilliseconds, netPlugin.Plugin); err != nil {
229+
log.Errorf("connection to telemetry service failed.")
230+
}
229231
defer tb.Close()
230-
231232
netPlugin.SetCNIReport(cniReport, tb)
232233

233234
t := time.Now()

cni/plugin.go

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -174,12 +174,15 @@ func (plugin *Plugin) InitializeKeyValueStore(config *common.PluginConfig) error
174174
}
175175
}
176176

177-
// Acquire store lock.
178-
if err := plugin.Store.Lock(store.DefaultLockTimeout); err != nil {
179-
log.Printf("[cni] Failed to lock store: %v.", err)
180-
return err
177+
// Acquiring store lock at this stage only for Linux. For optimization purpuses on Windows, the store lock will take place at later stage.
178+
// TODO: The Linux store locking should be removed from here as well after some more validation.
179+
if runtime.GOOS != "windows" {
180+
// Acquire store lock.
181+
if err := plugin.Store.Lock(store.DefaultLockTimeout); err != nil {
182+
log.Printf("[cni] Failed to lock store: %v.", err)
183+
return errors.Wrap(err, "error Acquiring store lock")
184+
}
181185
}
182-
183186
config.Store = plugin.Store
184187

185188
return nil
@@ -198,3 +201,25 @@ func (plugin *Plugin) UninitializeKeyValueStore() error {
198201

199202
return nil
200203
}
204+
205+
// Lock key-value store. This function is being used for locking access to TelemetryService start for Windows Runtime.
206+
func (plugin *Plugin) LockKeyValueStore() error {
207+
// Acquire store lock.
208+
if err := plugin.Store.Lock(store.DefaultLockTimeout); err != nil {
209+
log.Printf("[cni] Failed to lock store: %v.", err)
210+
return errors.Wrap(err, "error Acquiring store lock")
211+
}
212+
return nil
213+
}
214+
215+
// Unlock key-value store
216+
func (plugin *Plugin) UnLockKeyValueStore() error {
217+
if plugin.Store != nil {
218+
err := plugin.Store.Unlock()
219+
if err != nil {
220+
log.Printf("[cni] Failed to unlock store: %v.", err)
221+
return errors.Wrap(err, "error unlock store lock")
222+
}
223+
}
224+
return nil
225+
}

cns/restserver/api.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"github.com/Azure/azure-container-networking/cns/types"
2121
"github.com/Azure/azure-container-networking/cns/wireserver"
2222
"github.com/Azure/azure-container-networking/nmagent"
23-
"github.com/Azure/azure-container-networking/platform"
2423
"github.com/pkg/errors"
2524
)
2625

@@ -885,14 +884,6 @@ func (service *HTTPRestService) getNetworkContainerByOrchestratorContext(w http.
885884
return
886885
}
887886

888-
// getNetworkContainerByOrchestratorContext gets called for multitenancy and
889-
// setting the SDNRemoteArpMacAddress regKey is essential for the multitenancy
890-
// to work correctly in case of windows platform. Return if there is an error
891-
if err = platform.SetSdnRemoteArpMacAddress(); err != nil {
892-
logger.Printf("[Azure CNS] SetSdnRemoteArpMacAddress failed with error: %s", err.Error())
893-
return
894-
}
895-
896887
getNetworkContainerResponse := service.getNetworkContainerResponse(req)
897888
returnCode := getNetworkContainerResponse.Response.ReturnCode
898889
err = service.Listener.Encode(w, &getNetworkContainerResponse)

cns/service/main.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,13 @@ func main() {
660660
}
661661
}
662662

663+
// Setting the remote ARP MAC address to 12-34-56-78-9a-bc on windows for external traffic
664+
err = platform.SetSdnRemoteArpMacAddress()
665+
if err != nil {
666+
logger.Errorf("Failed to set remote ARP MAC address: %v", err)
667+
return
668+
}
669+
663670
// Initialze state in if CNS is running in CRD mode
664671
// State must be initialized before we start HTTPRestService
665672
if config.ChannelMode == cns.CRD {
@@ -697,12 +704,6 @@ func main() {
697704
return
698705
}
699706

700-
// Setting the remote ARP MAC address to 12-34-56-78-9a-bc on windows for external traffic
701-
err = platform.SetSdnRemoteArpMacAddress()
702-
if err != nil {
703-
logger.Errorf("Failed to set remote ARP MAC address: %v", err)
704-
return
705-
}
706707
}
707708

708709
// Initialize multi-tenant controller if the CNS is running in MultiTenantCRD mode.
@@ -714,12 +715,6 @@ func main() {
714715
return
715716
}
716717

717-
// Setting the remote ARP MAC address to 12-34-56-78-9a-bc on windows for external traffic
718-
err = platform.SetSdnRemoteArpMacAddress()
719-
if err != nil {
720-
logger.Errorf("Failed to set remote ARP MAC address: %v", err)
721-
return
722-
}
723718
}
724719

725720
logger.Printf("[Azure CNS] Start HTTP listener")

ipam/manager.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
package ipam
55

66
import (
7+
"runtime"
78
"sync"
89
"time"
910

1011
"github.com/Azure/azure-container-networking/common"
1112
"github.com/Azure/azure-container-networking/log"
1213
"github.com/Azure/azure-container-networking/platform"
1314
"github.com/Azure/azure-container-networking/store"
15+
"github.com/pkg/errors"
1416
)
1517

1618
const (
@@ -100,6 +102,22 @@ func (am *addressManager) restore(rehydrateIpamInfoOnReboot bool) error {
100102
return nil
101103
}
102104

105+
// Acquiring store lock at this stage for optimization purpuses on Windows
106+
if runtime.GOOS == "windows" {
107+
// Acquire store lock.
108+
if err := am.store.Lock(store.DefaultLockTimeout); err != nil {
109+
log.Printf("[ipam] Failed to lock store: %v.", err)
110+
return errors.Wrap(err, "error Acquiring store lock")
111+
}
112+
// Remove the lock on the key-value store
113+
defer func() {
114+
err := am.store.Unlock()
115+
if err != nil {
116+
log.Printf("[ipam] Failed to unlock store: %v.", err)
117+
}
118+
}()
119+
}
120+
103121
// Read any persisted state.
104122
err := am.store.Read(storeKey, am)
105123
if err != nil {
@@ -169,6 +187,22 @@ func (am *addressManager) save() error {
169187
// Update time stamp.
170188
am.TimeStamp = time.Now()
171189

190+
// Acquiring store lock at this stage for optimization purpuses on Windows
191+
if runtime.GOOS == "windows" {
192+
// Acquire store lock.
193+
if err := am.store.Lock(store.DefaultLockTimeout); err != nil {
194+
log.Printf("[ipam] Failed to lock store: %v.", err)
195+
return errors.Wrap(err, "error Acquiring store lock")
196+
}
197+
// Remove the lock on the key-value store
198+
defer func() {
199+
err := am.store.Unlock()
200+
if err != nil {
201+
log.Printf("[ipam] Failed to unlock store: %v.", err)
202+
}
203+
}()
204+
}
205+
172206
log.Printf("[ipam] saving ipam state.\n")
173207
err := am.store.Write(storeKey, am)
174208
if err == nil {

network/manager.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package network
55

66
import (
77
"net"
8+
"runtime"
89
"sync"
910
"time"
1011

@@ -15,6 +16,7 @@ import (
1516
"github.com/Azure/azure-container-networking/netlink"
1617
"github.com/Azure/azure-container-networking/platform"
1718
"github.com/Azure/azure-container-networking/store"
19+
"github.com/pkg/errors"
1820
)
1921

2022
const (
@@ -127,6 +129,22 @@ func (nm *networkManager) restore(isRehydrationRequired bool) error {
127129
// After a reboot, all address resources are implicitly released.
128130
// Ignore the persisted state if it is older than the last reboot time.
129131

132+
// Acquiring store lock at this stage for optimization purpuses on Windows
133+
if runtime.GOOS == "windows" {
134+
// Acquire store lock.
135+
if err := nm.store.Lock(store.DefaultLockTimeout); err != nil {
136+
log.Printf("[cni] Failed to lock store: %v.", err)
137+
return errors.Wrap(err, "error Acquiring store lock")
138+
}
139+
// Remove the lock on the key-value store
140+
defer func() {
141+
err := nm.store.Unlock()
142+
if err != nil {
143+
log.Printf("[cni] Failed to unlock store: %v.", err)
144+
}
145+
}()
146+
}
147+
130148
// Read any persisted state.
131149
err := nm.store.Read(storeKey, nm)
132150
if err != nil {
@@ -170,7 +188,6 @@ func (nm *networkManager) restore(isRehydrationRequired bool) error {
170188
for extIfName := range nm.ExternalInterfaces {
171189
delete(nm.ExternalInterfaces, extIfName)
172190
}
173-
174191
return nil
175192
}
176193
}
@@ -225,6 +242,22 @@ func (nm *networkManager) save() error {
225242
// Update time stamp.
226243
nm.TimeStamp = time.Now()
227244

245+
// Acquiring store lock at this stage for optimization purpuses on Windows
246+
if runtime.GOOS == "windows" {
247+
// Acquire store lock.
248+
if err := nm.store.Lock(store.DefaultLockTimeout); err != nil {
249+
log.Printf("[cni] Failed to lock store: %v.", err)
250+
return errors.Wrap(err, "error Acquiring store lock")
251+
}
252+
// Remove the lock on the key-value store
253+
defer func() {
254+
err := nm.store.Unlock()
255+
if err != nil {
256+
log.Printf("[cni] Failed to unlock store: %v.", err)
257+
}
258+
}()
259+
}
260+
228261
err := nm.store.Write(storeKey, nm)
229262
if err == nil {
230263
log.Printf("[net] Save succeeded.\n")

telemetry/telemetrybuffer.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,16 @@ import (
1111
"net"
1212
"os"
1313
"path/filepath"
14+
"runtime"
1415
"strings"
1516
"sync"
1617
"time"
1718

19+
"github.com/Azure/azure-container-networking/cni"
1820
"github.com/Azure/azure-container-networking/common"
1921
"github.com/Azure/azure-container-networking/log"
2022
"github.com/Azure/azure-container-networking/platform"
23+
"github.com/pkg/errors"
2124
)
2225

2326
// TelemetryConfig - telemetry config read by telemetry service
@@ -319,6 +322,42 @@ func (tb *TelemetryBuffer) ConnectToTelemetryService(telemetryNumRetries, teleme
319322
}
320323
}
321324

325+
// ConnectToTelemetryService for CNI - Attempt to spawn telemetry process if it's not already running. This function will have store lock for CNI.
326+
// TODO: This function should eventually get removed when stateless CNI is developed.
327+
func (tb *TelemetryBuffer) ConnectCNIToTelemetryService(telemetryNumRetries, telemetryWaitTimeInMilliseconds int, netPlugin *cni.Plugin) error {
328+
path, dir := getTelemetryServiceDirectory()
329+
args := []string{"-d", dir}
330+
for attempt := 0; attempt < 2; attempt++ {
331+
if err := tb.Connect(); err != nil {
332+
log.Logf("Connection to telemetry socket failed: %v", err)
333+
if runtime.GOOS == "windows" {
334+
if err = netPlugin.LockKeyValueStore(); err != nil {
335+
log.Logf("lock acquire error: %v", err)
336+
return errors.Wrap(err, "lock acquire error")
337+
}
338+
}
339+
if err = tb.Cleanup(FdName); err != nil {
340+
return errors.Wrap(err, "cleanup failed")
341+
}
342+
if err = StartTelemetryService(path, args); err != nil {
343+
return errors.Wrap(err, "StartTelemetryService failed")
344+
}
345+
WaitForTelemetrySocket(telemetryNumRetries, time.Duration(telemetryWaitTimeInMilliseconds))
346+
if runtime.GOOS == "windows" {
347+
if err = netPlugin.UnLockKeyValueStore(); err != nil {
348+
log.Logf("failed to relinquish lock error: %v", err)
349+
return errors.Wrap(err, "failed to relinquish lock error")
350+
}
351+
}
352+
} else {
353+
tb.Connected = true
354+
log.Logf("Connected to telemetry service")
355+
return nil
356+
}
357+
}
358+
return nil
359+
}
360+
322361
func getTelemetryServiceDirectory() (path string, dir string) {
323362
path = fmt.Sprintf("%v/%v", CniInstallDir, TelemetryServiceProcessName)
324363
if exists, _ := platform.CheckIfFileExists(path); !exists {

0 commit comments

Comments
 (0)