Skip to content

[Draft] Iceberg rename table #7071

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions extensions/spark/kyuubi-spark-authz/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,25 @@
</properties>

<dependencies>

<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.dimafeng</groupId>
<artifactId>testcontainers-scala-mysql_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.dimafeng</groupId>
<artifactId>testcontainers-scala-scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-util-scala_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.plugin.spark.authz

import com.dimafeng.testcontainers.MySQLContainer
import org.testcontainers.utility.DockerImageName

trait MysqlContainerEnv {

val containerDef: MySQLContainer = MySQLContainer.Def(
dockerImageName = DockerImageName.parse("mysql:5.7"),
databaseName = "hive_metastore",
username = "root",
password = "123456")
.createContainer()

def startEngine(): Unit = {
containerDef.start()
}

def stopEngine(): Unit = {
containerDef.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,40 @@ trait SparkSessionProvider {

protected val extraSparkConf: SparkConf = new SparkConf()

protected val useMysqlEnv: Boolean = false

def getMysqlJdbcUrl: String = ""

def getMysqlUsername: String = ""

def getMysqlPassword: String = ""

def getDriverClassName: String = ""

protected lazy val spark: SparkSession = {
val metastore = {
val path = Utils.createTempDir(prefix = "hms")
Files.deleteIfExists(path)
path
}
val ret = SparkSession.builder()
val sessionBuilder = SparkSession.builder()
.master("local")
.config("spark.ui.enabled", "false")
.config("javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$metastore;create=true")
.config("spark.sql.catalogImplementation", catalogImpl)
.config(
"spark.sql.warehouse.dir",
Utils.createTempDir("spark-warehouse").toString)
.config("spark.sql.extensions", sqlExtensions)
.withExtensions(extension)
.config(extraSparkConf)
.getOrCreate()

if (!useMysqlEnv) {
val metastore = {
val path = Utils.createTempDir(prefix = "hms")
Files.deleteIfExists(path)
path
}
sessionBuilder.config(
"javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$metastore;create=true")
}

val ret = sessionBuilder.getOrCreate()
if (catalogImpl == "hive") {
// Ensure HiveExternalCatalog.client.userName is defaultTableOwner
UserGroupInformation.createRemoteUser(defaultTableOwner).doAs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import scala.util.Try
import org.apache.spark.sql.Row
import org.scalatest.Outcome

// scalastyle:off
import org.apache.kyuubi.Utils
// scalastyle:off
import org.apache.kyuubi.plugin.spark.authz.AccessControlException
import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._
import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
Expand All @@ -39,11 +39,12 @@ import org.apache.kyuubi.util.AssertionUtils._
*/
@IcebergTest
class IcebergCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
override protected val catalogImpl: String = "hive"
override protected val useMysqlEnv: Boolean = true
override protected val catalogImpl: String = "in-memory"
override protected val sqlExtensions: String =
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"

val catalogV2 = "local"
val catalogV2 = "jdbc_catalog"
val namespace1 = icebergNamespace
val table1 = "table1"
val outputTable1 = "outputTable1"
Expand All @@ -55,16 +56,20 @@ class IcebergCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite
}

override def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set(
s"spark.sql.catalog.$catalogV2",
"org.apache.iceberg.spark.SparkCatalog")
spark.conf.set(s"spark.sql.catalog.$catalogV2.type", "hadoop")
spark.conf.set(
s"spark.sql.catalog.$catalogV2.type",
"jdbc")
spark.conf.set(s"spark.sql.catalog.$catalogV2.uri", getMysqlJdbcUrl)
spark.conf.set(s"spark.sql.catalog.$catalogV2.jdbc.user", getMysqlUsername)
spark.conf.set(s"spark.sql.catalog.$catalogV2.jdbc.password", getMysqlPassword)
spark.conf.set(
s"spark.sql.catalog.$catalogV2.warehouse",
Utils.createTempDir("iceberg-hadoop").toString)

super.beforeAll()

doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $catalogV2.$namespace1"))
doAs(
admin,
Expand Down Expand Up @@ -586,4 +591,22 @@ class IcebergCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite
doAs(admin, sql(dropTagSql))
}
}

test("RENAME TABLE for Iceberg") {
val table = "partitioned_table"
withCleanTmpResources(Seq((table, "table"))) {
doAs(
admin,
sql(
s"CREATE TABLE $catalogV2.$namespace1.$table" +
s"(id int NOT NULL, name string, city string) USING iceberg"))
val renameSql = s"alter table $catalogV2.$namespace1.$table " +
s"rename to $namespace1.new_table"
interceptEndsWith[AccessControlException] {
doAs(someone, sql(renameSql))
}(s"does not have [alter] privilege on [$namespace1/partitioned_table]")
doAs(admin, sql(renameSql))
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,48 @@ import org.scalatest.funsuite.AnyFunSuite

import org.apache.kyuubi.Utils
import org.apache.kyuubi.plugin.spark.authz.{AccessControlException, SparkSessionProvider}
import org.apache.kyuubi.plugin.spark.authz.MysqlContainerEnv
import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._
import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
import org.apache.kyuubi.plugin.spark.authz.rule.Authorization.KYUUBI_AUTHZ_TAG
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
import org.apache.kyuubi.util.AssertionUtils._
import org.apache.kyuubi.util.reflect.ReflectUtils._
abstract class RangerSparkExtensionSuite extends AnyFunSuite
with SparkSessionProvider with BeforeAndAfterAll {
with SparkSessionProvider with BeforeAndAfterAll with MysqlContainerEnv {
// scalastyle:on
override protected val extension: SparkSessionExtensions => Unit = new RangerSparkExtension

var mysqlJdbcUrl = ""
var mysqlUsername = ""
var mysqlPassword = ""
var driverClassName = ""

override def getMysqlJdbcUrl: String = mysqlJdbcUrl

override def getMysqlUsername: String = mysqlUsername

override def getMysqlPassword: String = mysqlPassword

override def getDriverClassName: String = driverClassName

override def afterAll(): Unit = {
spark.stop()
super.afterAll()
if (useMysqlEnv) {
stopEngine()
}
}

override def beforeAll(): Unit = {
if (useMysqlEnv) {
startEngine()
this.mysqlJdbcUrl = containerDef.jdbcUrl
this.mysqlUsername = containerDef.username
this.mysqlPassword = containerDef.password
this.driverClassName = containerDef.driverClassName
}
super.beforeAll()
}

protected def errorMessage(
Expand Down
Loading