Skip to content

Commit c1c3d5b

Browse files
authored
Merge pull request data-integrations#619 from cloudsufi/bugfix/PLUGIN-1925
PLUGIN-1925: Introduce Custom BigDecimal Splitter
2 parents 491770b + 8a24390 commit c1c3d5b

File tree

5 files changed

+166
-2
lines changed

5 files changed

+166
-2
lines changed

database-commons/src/main/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormat.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.hadoop.mapreduce.TaskAttemptContext;
2929
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
3030
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
31+
import org.apache.hadoop.mapreduce.lib.db.DBSplitter;
3132
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
3233
import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat;
3334
import org.slf4j.Logger;
@@ -39,6 +40,7 @@
3940
import java.sql.DriverManager;
4041
import java.sql.SQLException;
4142
import java.sql.Statement;
43+
import java.sql.Types;
4244
import java.util.Properties;
4345

4446
/**
@@ -128,6 +130,15 @@ public Connection createConnection() {
128130
return getConnection();
129131
}
130132

133+
@Override
134+
protected DBSplitter getSplitter(int sqlDataType) {
135+
// Use SafeBigDecimalSplitter for columns having high precision decimal or numeric columns
136+
if (sqlDataType == Types.NUMERIC || sqlDataType == Types.DECIMAL) {
137+
return new SafeBigDecimalSplitter();
138+
}
139+
return super.getSplitter(sqlDataType);
140+
}
141+
131142
@Override
132143
public RecordReader createDBRecordReader(DBInputSplit split, Configuration conf) throws IOException {
133144
final RecordReader dbRecordReader = super.createDBRecordReader(split, conf);
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.db.source;
18+
19+
import org.apache.hadoop.mapreduce.lib.db.BigDecimalSplitter;
20+
import java.math.BigDecimal;
21+
import java.math.RoundingMode;
22+
23+
/**
24+
* Safe implementation of {@link BigDecimalSplitter} to ensure precise division of BigDecimal values while calculating
25+
* split points for NUMERIC and DECIMAL types.
26+
*
27+
* <p>Problem: The default {@link BigDecimalSplitter} implementation may return 0 when the numerator is smaller than the
28+
* denominator (e.g., 1 / 4 = 0), due to the lack of a defined scale for division. Since the result (0) is smaller than
29+
* {@link BigDecimalSplitter#MIN_INCREMENT} (i.e. {@code 10000 * Double.MIN_VALUE}), the split size defaults to
30+
* {@code MIN_INCREMENT}, leading to an excessive number of splits (~10M) and potential OOM errors.</p>
31+
*
32+
* <p>Fix: This implementation derives scale from column metadata, adds a buffer of 5 decimal places, and uses
33+
* {@link RoundingMode#HALF_UP} as the rounding mode.</p
34+
*
35+
* <p>Note: This class is used by {@link DataDrivenETLDBInputFormat}.</p>
36+
*/
37+
public class SafeBigDecimalSplitter extends BigDecimalSplitter {
38+
39+
/* An additional buffer of +5 digits is applied to preserve accuracy during division. */
40+
public static final int SCALE_BUFFER = 5;
41+
/**
42+
* Performs safe division with correct scale handling.
43+
*
44+
* @param numerator the dividend (BigDecimal)
45+
* @param denominator the divisor (BigDecimal)
46+
* @return quotient with derived scale
47+
* @throws ArithmeticException if denominator is zero
48+
*/
49+
@Override
50+
protected BigDecimal tryDivide(BigDecimal numerator, BigDecimal denominator) {
51+
// Determine the required scale for the division and add a buffer to ensure accuracy
52+
int effectiveScale = Math.max(numerator.scale(), denominator.scale()) + SCALE_BUFFER;
53+
return numerator.divide(denominator, effectiveScale, RoundingMode.HALF_UP);
54+
}
55+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.db.source;
18+
19+
import org.apache.hadoop.conf.Configuration;
20+
import org.apache.hadoop.mapreduce.InputSplit;
21+
import org.apache.hadoop.mapreduce.lib.db.BigDecimalSplitter;
22+
import org.junit.Test;
23+
24+
import java.math.BigDecimal;
25+
import java.sql.ResultSet;
26+
import java.sql.SQLException;
27+
import java.util.List;
28+
29+
import static org.junit.Assert.assertEquals;
30+
import static org.junit.Assert.assertThrows;
31+
import static org.junit.Assert.assertTrue;
32+
import static org.mockito.Mockito.mock;
33+
import static org.mockito.Mockito.when;
34+
35+
/**
36+
* Test class for {@link SafeBigDecimalSplitter}
37+
*/
38+
public class SafeBigDecimalSplitterTest {
39+
private final SafeBigDecimalSplitter splitter = new SafeBigDecimalSplitter();
40+
41+
@Test
42+
public void testSmallRangeDivision() {
43+
BigDecimal result = splitter.tryDivide(BigDecimal.ONE, new BigDecimal("4"));
44+
assertEquals(new BigDecimal("0.25000"), result);
45+
}
46+
47+
@Test
48+
public void testLargePrecision() {
49+
BigDecimal numerator = new BigDecimal("1.0000000000000000001");
50+
BigDecimal denominator = new BigDecimal("3");
51+
BigDecimal result = splitter.tryDivide(numerator, denominator);
52+
assertTrue(result.compareTo(BigDecimal.ZERO) > 0);
53+
}
54+
55+
@Test
56+
public void testDivisionByZero() {
57+
assertThrows(ArithmeticException.class, () ->
58+
splitter.tryDivide(BigDecimal.ONE, BigDecimal.ZERO));
59+
}
60+
61+
@Test
62+
public void testDivisionWithZeroNumerator() {
63+
// when minVal == maxVal
64+
BigDecimal result = splitter.tryDivide(BigDecimal.ZERO, BigDecimal.ONE);
65+
assertEquals(0, result.compareTo(BigDecimal.ZERO));
66+
}
67+
68+
@Test
69+
public void testSplits() throws SQLException {
70+
BigDecimal minVal = BigDecimal.valueOf(1);
71+
BigDecimal maxVal = BigDecimal.valueOf(2);
72+
int numSplits = 4;
73+
ResultSet resultSet = mock(ResultSet.class);
74+
Configuration conf = mock(Configuration.class);
75+
when(conf.getInt("mapreduce.job.maps", 1)).thenReturn(numSplits);
76+
when(resultSet.getBigDecimal(1)).thenReturn(minVal);
77+
when(resultSet.getBigDecimal(2)).thenReturn(maxVal);
78+
BigDecimalSplitter bigDecimalSplitter = new SafeBigDecimalSplitter();
79+
List<InputSplit> actualSplits = bigDecimalSplitter.split(conf, resultSet, "id");
80+
assertEquals(numSplits, actualSplits.size());
81+
}
82+
83+
@Test
84+
public void testSplitsWithMinValueEqualToMaxValue() throws SQLException {
85+
// when minVal == maxVal
86+
BigDecimal minVal = BigDecimal.valueOf(1);
87+
BigDecimal maxVal = BigDecimal.valueOf(1);
88+
int numSplits = 1;
89+
ResultSet resultSet = mock(ResultSet.class);
90+
Configuration conf = mock(Configuration.class);
91+
when(conf.getInt("mapreduce.job.maps", 1)).thenReturn(numSplits);
92+
when(resultSet.getBigDecimal(1)).thenReturn(minVal);
93+
when(resultSet.getBigDecimal(2)).thenReturn(maxVal);
94+
BigDecimalSplitter bigDecimalSplitter = new SafeBigDecimalSplitter();
95+
List<InputSplit> actualSplits = bigDecimalSplitter.split(conf, resultSet, "id");
96+
assertEquals(numSplits, actualSplits.size());
97+
}
98+
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@
7878
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
7979
<!-- version properties -->
8080
<cdap.version>6.11.0</cdap.version>
81-
<cdap.plugin.version>2.13.1-SNAPSHOT</cdap.plugin.version>
81+
<cdap.plugin.version>2.13.1</cdap.plugin.version>
8282
<guava.version>13.0.1</guava.version>
8383
<hadoop.version>3.3.6</hadoop.version>
8484
<hsql.version>2.2.4</hsql.version>

postgresql-plugin/src/e2e-test/resources/errorMessage.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ errorMessageBlankPassword=SQL error while getting query schema: The server reque
1616
errorMessageInvalidPassword=SQL error while getting query schema: FATAL: password authentication failed for user
1717
errorMessageInvalidSourceHost=SQL error while getting query schema: The connection attempt failed.
1818
errorMessageInvalidTableName=Table 'table' does not exist. Ensure table '"table"' is set correctly and that the
19-
errorMessageInvalidSinkDatabase=Error encountered while configuring the stage: 'SQL Error occurred, sqlState: '08004', errorCode: '0', errorMessage: SQL Exception occurred: [Message='The server requested SCRAM-based authentication
19+
errorMessageInvalidSinkDatabase=Error encountered while configuring the stage: 'SQL Error occurred, sqlState: '3D000', errorCode: '0', errorMessage: SQL Exception occurred
2020
errorMessageInvalidHost=Error encountered while configuring the stage: 'SQL Error occurred, sqlState: '08001', errorCode: '0', errorMessage: SQL Exception occurred: [Message='The connection attempt failed.', SQLState='08001', ErrorCode='0'].'
2121
errorLogsMessageInvalidBoundingQuery=Spark program 'phase-1' failed with error: Stage 'PostgreSQL' encountered : \
2222
java.io.IOException: The column index is out of range: 1, number of columns: 0.. Please check the system logs for more details.

0 commit comments

Comments
 (0)