Skip to content

Commit 417a550

Browse files
utk-12sshardool
authored andcommitted
Fixing bug where proxy user from flow parameter was not included (azkaban#3306)
* Fixing bug where proxy user from flow parameter was not included * Removing variable instances of USER_TO_PROXY * Added separate unit test for flow param and moved check to allow unit testing --------- Co-authored-by: Utkarsh Kattishettar <[email protected]> RB=3898971
1 parent 82abd4f commit 417a550

File tree

4 files changed

+43
-8
lines changed

4 files changed

+43
-8
lines changed

azkaban-common/src/main/java/azkaban/executor/container/ContainerImplUtils.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,13 @@ public static int getFlowNameHashValMapping(ExecutableFlow flow) {
9898
}
9999

100100
public static Set<String> getProxyUsersForFlow(final ProjectManager projectManager,
101-
final ExecutableFlow flow) {
101+
final ExecutableFlow flow, final Map<String, String> flowParam) {
102102
final Set<String> proxyUsers = new HashSet<>();
103+
// First add the proxy user from the top level flow parameter. Adding this here as individual job
104+
// may or may not override this.
105+
if (flowParam != null && flowParam.containsKey(USER_TO_PROXY)) {
106+
proxyUsers.add(flowParam.get(USER_TO_PROXY));
107+
}
103108
// Get the project and flow Object that needs to be used repeatedly in the DAG.
104109
Project project = projectManager.getProject(flow.getProjectId());
105110
Flow flowObj = project.getFlow(flow.getFlowId());
@@ -131,6 +136,8 @@ public static Set<String> getProxyUsersForFlow(final ProjectManager projectManag
131136
// DFS Walk of the Graph to find all the Proxy Users.
132137
populateProxyUsersForFlow(flow, flowObj, project, projectManager, proxyUsers);
133138
proxyUsers.removeAll(Collections.singleton(""));
139+
// Removing instances of variables as USER_TO_PROXY
140+
proxyUsers.removeIf(user -> user.contains("$"));
134141
return proxyUsers;
135142
}
136143

@@ -199,4 +206,4 @@ public static Set<String> getJobTypeUsersForFlow(HashMap<String, String> jobType
199206
}
200207
return jobTypeProxyUserSet;
201208
}
202-
}
209+
}

azkaban-common/src/main/java/azkaban/executor/container/KubernetesContainerizedImpl.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package azkaban.executor.container;
1717

18+
import static azkaban.Constants.JobProperties.*;
1819
import static azkaban.executor.ExecutionControllerUtils.clusterQualifiedExecId;
1920
import static java.util.Objects.requireNonNull;
2021

@@ -1172,18 +1173,18 @@ private void createPod(final int executionId)
11721173
Boolean.parseBoolean(flowParam.get(FlowParameters.PROXY_USER_PREFETCH_ALL));
11731174
/*
11741175
As an optimization for not walking through the DAG and making DB queries for large DAGs we
1175-
try to check if the proxy users list from the project page is less then a certain threshold.
1176+
try to check if the proxy users list from the project page is less than a certain threshold.
11761177
If it is less than this threshold, we simply add all of them. This configuration will be
11771178
custom to a given environment. An option to fetch all of them via the project page is
11781179
also provided with a flow parameter.
11791180
*/
11801181

1181-
if (prefetchAllProxyUserCredentials || flow.getProxyUsers().size() <= this.proxyUserPrefetchThreshold ) {
1182+
if (prefetchAllProxyUserCredentials || flow.getProxyUsers().size() <= this.proxyUserPrefetchThreshold) {
11821183
logger.info("Fetched proxy users from permissions page");
11831184
proxyUsersMap.addAll(flow.getProxyUsers());
11841185
} else{
11851186
Instant proxyUserFetchStartTime = Instant.now();
1186-
proxyUsersMap.addAll(ContainerImplUtils.getProxyUsersForFlow(this.projectManager, flow));
1187+
proxyUsersMap.addAll(ContainerImplUtils.getProxyUsersForFlow(this.projectManager, flow, flowParam));
11871188
logger.info("Fetching proxy users from DAG and took: {} seconds",
11881189
Duration.between(proxyUserFetchStartTime, Instant.now()).getSeconds());
11891190
}

azkaban-common/src/test/java/azkaban/executor/container/KubernetesContainerizedImplTest.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@
2121
import static azkaban.Constants.EventReporterConstants.FLOW_STATUS;
2222
import static azkaban.Constants.EventReporterConstants.VERSION_SET;
2323
import static azkaban.ServiceProvider.SERVICE_PROVIDER;
24-
import static azkaban.executor.container.ContainerImplUtils.getJobTypeUsersForFlow;
25-
import static azkaban.executor.container.ContainerImplUtils.populateProxyUsersForFlow;
24+
import static azkaban.executor.container.ContainerImplUtils.*;
2625
import static org.assertj.core.api.Assertions.assertThat;
2726
import static org.mockito.Mockito.any;
2827
import static org.mockito.Mockito.mock;
@@ -630,6 +629,34 @@ public void testVersionSetConstructionWithRampupManager() throws Exception {
630629
Assert.assertEquals("6.4", versionSet.getVersion("dependency1").get().getVersion());
631630
}
632631

632+
@Test
633+
public void testPopulatingProxyUsersFromFlowParamOnly() throws Exception {
634+
final ExecutableFlow flow = createTestFlow();
635+
flow.setProjectId(1);
636+
ProjectManager projectManager = mock(ProjectManager.class);
637+
Project project = mock(Project.class);
638+
Flow flowObj = mock(Flow.class);
639+
when(flowObj.toString()).thenReturn(flow.getFlowName());
640+
641+
ExecutableNode node1 = new ExecutableNode();
642+
node1.setId("node1");
643+
node1.setJobSource("job1");
644+
node1.setStatus(Status.PREPARING);
645+
Props currentNodeProps1 = new Props();
646+
Props currentNodeJobProps1 = new Props();
647+
648+
when(projectManager.getProject(flow.getProjectId())).thenReturn(project);
649+
when(project.getFlow(flow.getFlowId())).thenReturn(flowObj);
650+
when(projectManager.getProperties(project, flowObj, node1.getId(), node1.getJobSource()))
651+
.thenReturn(currentNodeProps1);
652+
when(projectManager.getJobOverrideProperty(project, flowObj, node1.getId(),
653+
node1.getJobSource()))
654+
.thenReturn(currentNodeJobProps1);
655+
Map<String, String> flowParams = new HashMap<>();
656+
flowParams.put("user.to.proxy", "testuser1");
657+
Set<String> proxyUsers = new HashSet<>(getProxyUsersForFlow(projectManager, flow, flowParams));
658+
Assert.assertTrue(proxyUsers.contains("testuser1"));
659+
}
633660
@Test
634661
public void testPopulatingProxyUsersFromProject() throws Exception {
635662
final ExecutableFlow flow = createTestFlow();

azkaban-web-server/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ node {
2626
npmVersion = '5.6.0'
2727

2828
// Base URL for fetching node distributions (change if you have a mirror).
29-
distBaseUrl = 'https://nodejs.org/dist'
29+
distBaseUrl = 'https://nodejs.org/dist/'
3030

3131
// If true, it will download node using above parameters.
3232
// If false, it will try to use globally installed node.

0 commit comments

Comments
 (0)