diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ee14dd6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,20 @@ +.idea +out +target +kafkatools.iml +.classpath +.project +.settings/ +.vscode/ +exports.sh +env.env +prd.sh +qa.sh +stg.sh + + +testtopic +kafka_client_jaas.config +local.sh +oldway +eclipse-formatter.xml diff --git a/Dockerfile b/Dockerfile index 819b923..2e8bbbc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,7 @@ RUN ["mvn", "verify"] # Adding source, compile and package into a fat jar ADD src /code/src RUN ["mvn", "package"] - -EXPOSE 3800 -CMD ["/usr/lib/jvm/java-8-openjdk-amd64/bin/java", "-jar", "target/oct-kafka-api-jar-with-dependencies.jar"] +ADD create.sql create.sql +EXPOSE 9000 +CMD ["/usr/lib/jvm/java-8-openjdk-amd64/bin/java", "-jar", "target/kafka-api-jar-with-dependencies.jar"] diff --git a/README.md b/README.md index 86f57ca..b60a248 100644 --- a/README.md +++ b/README.md @@ -2,16 +2,99 @@ ## Synopis -Runs a REST API to offer management of Kafka topics. +Runs a REST API to offer management of Kafka topics, consumergroups, users, and ACLs ## Details -* GET ... (todo) +``` +get /v1/kafka/cluster/:cluster/consumergroups +get /v1/kafka/cluster/:cluster/credentials/:username +get /v1/kafka/cluster/:cluster/topic/:topic +get /v1/kafka/cluster/:cluster/topic/:topic/consumergroups +get /v1/kafka/cluster/:cluster/topics +post /v1/kafka/cluster/:cluster/topic +post /v1/kafka/cluster/:cluster/user +post /v1/kafka/cluster/:cluster/user/:username/topic/:topic/consumergroup/rotate +put /v1/kafka/cluster/:cluster/acl/user/:user/topic/:topic/role/:role +put /v1/kafka/cluster/:cluster/topic/:topic/retentionms/:retentionms +delete /v1/kafka/cluster/:cluster/topic/:topic -## Runtime Environment Variables -* PORT +JSON payload exmample for creation of a topic is + +{ + "topic": { + "name": "tt9", + "description":"User.Friendly.Name", + "organization":"testorg", + "config": { + "cleanup.policy": "delete", + "partitions": 8, + "retention.ms":8888 + } + } +} + +``` + + +## Runtime Environment Variables +``` +SANDBOX_KAFKA_PORT +SANDBOX_DEFAULT_PARTITIONS +SANDBOX_DEFAULT_REPLICAS +SANDBOX_DEFAULT_RETENTION +SANDBOX_KAFKA_ADMIN_USERNAME +SANDBOX_KAFKA_ADMIN_PASSWORD +SANDBOX_KAFKA_LOCATION +SANDBOX_KAFKA_HOSTNAME +SANDBOX_ZK +SANDBOX_KAFKA_AVRO_REGISTRY_LOCATION +SANDBOX_KAFKA_AVRO_REGISTRY_HOSTNAME +SANDBOX_KAFKA_AVRO_REGISTRY_PORT +DEV_KAFKA_PORT +DEV_DEFAULT_PARTITIONS +DEV_DEFAULT_REPLICAS +DEV_DEFAULT_RETENTION +DEV_KAFKA_ADMIN_USERNAME +DEV_KAFKA_ADMIN_PASSWORD +DEV_KAFKA_LOCATION +DEV_KAFKA_HOSTNAME +DEV_ZK +DEV_KAFKA_AVRO_REGISTRY_LOCATION +DEV_KAFKA_AVRO_REGISTRY_HOSTNAME +DEV_KAFKA_AVRO_REGISTRY_PORT +PRODUCTION_KAFKA_POST +PRODUCTION_DEFAULT_PARTITIONS +PRODUCTION_DEFAULT_REPLICAS +PRODUCTION_DEFAULT_RETENTION +PRODUCTION_KAFKA_ADMIN_USERNAME +PRODUCTION_KAFKA_ADMIN_PASSWORD +PRODUCTION_KAFKA_LOCATION +PRODUCTION_KAFKA_HOSTNAME +PRODUCTION_ZK +PRODUCTION_KAFKA_AVRO_REGISTRY_LOCATION +PRODUCTION_KAFKA_AVRO_REGISTRY_HOSTNAME +PRODUCTION_KAFKA_AVRO_REGISTRY_PORT +BROKERDB +BROKERDBUSER +BROKERDBPASS +PORT +``` ## Build -* mvn deploy +``` +mvn dependency:resolve +mvn verify +mvn package +``` + +## Run + +``` +java -jar target/kafka-api-jar-with-dependencies.jar +``` + + + diff --git a/create.sql b/create.sql new file mode 100644 index 0000000..df279c7 --- /dev/null +++ b/create.sql @@ -0,0 +1,131 @@ +-- DDL generated by Postico 1.4.2 +-- Not all database features are supported. Do not use for backup. + +-- Table Definition ---------------------------------------------- + +CREATE TABLE IF NOT EXISTS provision_user ( + userid text DEFAULT uuid_generate_v4() PRIMARY KEY, + password text, + username text UNIQUE, + created_timestamp timestamp without time zone DEFAULT now(), + claimed boolean, + claimed_timestamp timestamp without time zone, + cluster text +); + +-- Indices ------------------------------------------------------- + +CREATE UNIQUE INDEX IF NOT EXISTS username_uniqu ON provision_user(username text_ops); +CREATE UNIQUE INDEX IF NOT EXISTS provision_user_pkey ON provision_user(userid text_ops); + + +-- DDL generated by Postico 1.4.2 +-- Not all database features are supported. Do not use for backup. + +-- Table Definition ---------------------------------------------- + +CREATE TABLE IF NOT EXISTS provision_topic ( + topicid text DEFAULT uuid_generate_v4() PRIMARY KEY, + topic text UNIQUE, + partitions integer, + replicas integer, + retentionms integer, + cleanuppolicy text, + created_timestamp timestamp without time zone DEFAULT now(), + cluster text, + organization text, + updated_timestamp timestamp without time zone, + description text +); + +-- Indices ------------------------------------------------------- + +CREATE UNIQUE INDEX IF NOT EXISTS topic_unique ON provision_topic(topic text_ops); +CREATE UNIQUE INDEX IF NOT EXISTS provision_topic_pkey ON provision_topic(topicid text_ops); + + +-- DDL generated by Postico 1.4.2 +-- Not all database features are supported. Do not use for backup. + +-- Table Definition ---------------------------------------------- + +CREATE TABLE IF NOT EXISTS provision_consumergroup ( + consumergroupid text DEFAULT uuid_generate_v4(), + userid text, + topicid text, + consumergroupname text, + active boolean, + created_timestamp timestamp without time zone DEFAULT now(), + updated_timestamp timestamp without time zone, + cluster text +); + + + +-- DDL generated by Postico 1.4.2 +-- Not all database features are supported. Do not use for backup. + +-- Table Definition ---------------------------------------------- + +CREATE TABLE IF NOT EXISTS provision_acl ( + aclid text DEFAULT uuid_generate_v4() PRIMARY KEY, + userid text, + topicid text, + role text, + created_timestamp timestamp without time zone DEFAULT now(), + cluster text, + CONSTRAINT acl_unique UNIQUE (userid, topicid, role) +); + +-- Indices ------------------------------------------------------- + +CREATE UNIQUE INDEX IF NOT EXISTS provision_acl_pkey ON provision_acl(aclid text_ops); +CREATE UNIQUE INDEX IF NOT EXISTS acl_unique ON provision_acl(userid text_ops,topicid text_ops,role text_ops); + + +-- DDL generated by Postico 1.4.2 +-- Not all database features are supported. Do not use for backup. + +-- Table Definition ---------------------------------------------- + +CREATE OR REPLACE VIEW credentials AS SELECT provision_user.username, + provision_user.password, + provision_topic.topic, + provision_acl.role, + provision_consumergroup.consumergroupname, + provision_user.cluster + FROM provision_consumergroup, + provision_acl, + provision_user, + provision_topic + WHERE provision_consumergroup.topicid = provision_acl.topicid AND provision_acl.userid = provision_user.userid AND provision_acl.topicid = provision_topic.topicid AND provision_acl.role = 'consumer'::text AND provision_user.cluster = provision_topic.cluster AND provision_topic.cluster = provision_acl.cluster AND provision_acl.cluster = provision_consumergroup.cluster AND provision_consumergroup.active = true +UNION ALL + SELECT provision_user.username, + provision_user.password, + provision_topic.topic, + provision_acl.role, + ''::text AS consumergroupname, + provision_user.cluster + FROM provision_acl, + provision_user, + provision_topic + WHERE provision_acl.userid = provision_user.userid AND provision_acl.topicid = provision_topic.topicid AND provision_user.cluster = provision_topic.cluster AND provision_topic.cluster = provision_acl.cluster AND provision_acl.role = 'producer'::text; + + + +-- DDL generated by Postico 1.4.2 +-- Not all database features are supported. Do not use for backup. + +-- Table Definition ---------------------------------------------- + +CREATE OR REPLACE VIEW consumergroups AS SELECT provision_user.username, + provision_topic.topic, + provision_consumergroup.consumergroupname, + provision_consumergroup.cluster + FROM provision_consumergroup, + provision_topic, + provision_user + WHERE provision_consumergroup.userid = provision_user.userid AND provision_consumergroup.topicid = provision_topic.topicid AND provision_consumergroup.active = true; + + + diff --git a/pom.xml b/pom.xml index 16aeab6..689a9f3 100644 --- a/pom.xml +++ b/pom.xml @@ -2,13 +2,13 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - com.octanner.cobra - oct-kafka-api + com.mycompany.app + my-app 1.0-SNAPSHOT jar - Kafka Tools / APIs - http://scc-gitlab-1.dev.octanner.net/murray.resinski/oct-kafka-api + Maven Quick Start Archetype + http://maven.apache.org @@ -18,7 +18,7 @@ maven-jar-plugin 2.4 - oct-kafka-api + kafka-api true @@ -47,7 +47,7 @@ package - oct-kafka-api + kafka-api jar-with-dependencies @@ -65,32 +65,55 @@ + + com.despegar + spark-test + 1.1.8 + + + org.apache.commons + commons-lang3 + 3.7 + + + com.googlecode.json-simple + json-simple + 1.1 + + + + postgresql + postgresql + 9.1-901-1.jdbc4 + + + org.apache.kafka kafka_2.11 - 0.9.0.1 + 1.1.0 org.apache.kafka kafka-clients - 0.9.0.1 + 1.1.0 org.apache.kafka kafka-log4j-appender - 0.9.0.1 + 1.1.0 org.apache.kafka kafka-tools - 0.9.0.1 + 1.1.0 @@ -171,44 +194,16 @@ - - com.fasterxml.jackson.core - jackson-annotations - 2.8.6 - - - - - - com.fasterxml.jackson.core - jackson-core - 2.8.6 - - - - - - com.google.code.gson - gson - 2.8.0 - - - - - - com.fasterxml.jackson.core - jackson-databind - 2.8.6 + org.slf4j + slf4j-api + 1.7.25 - - org.postgresql - postgresql - 9.4.1212 + org.slf4j + slf4j-log4j12 + 1.7.25 - - diff --git a/src/main/java/CustomException.java b/src/main/java/CustomException.java new file mode 100644 index 0000000..19be6fc --- /dev/null +++ b/src/main/java/CustomException.java @@ -0,0 +1,8 @@ + +public class CustomException extends Exception { + + public CustomException(String message) { + super("status~500~"+message); + } + +} \ No newline at end of file diff --git a/src/main/java/KafkaBroker.java b/src/main/java/KafkaBroker.java index ad59f68..ceb4e04 100644 --- a/src/main/java/KafkaBroker.java +++ b/src/main/java/KafkaBroker.java @@ -1,486 +1,923 @@ -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.gson.Gson; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonParser; +import java.util.*; +import java.io.File; +import java.io.FileNotFoundException; import kafka.admin.AdminUtils; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; +import java.io.StringReader; +import java.sql.*; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; - -import java.sql.*; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Properties; -import java.util.UUID; - +import org.apache.log4j.*; +import org.apache.commons.lang3.*; +import spark.Request; +import spark.Response; +import javax.servlet.http.HttpServletResponse; +import static java.lang.Math.toIntExact; import static spark.Spark.*; - public class KafkaBroker { - public static String KAFKA_BROKERS; - public static String KAFKA_ZOOKEEPERS; - static class Instance { - public String KAFKA_BROKERS; - public String KAFKA_TOPIC; - public String KAFKA_ZOOKEEPERS; + static class Topic { + public String topic; + public String organization; + public Long retentionms; + public Long partitions; + public Long replicas; + public String cleanuppolicy; + public String description; } - static class InstancePayload { - public String plan; - public String billingcode; - - public boolean isValid() { - return plan != null && billingcode != null; - } + static class Consumergroup { + public String consumergroupname; + public String username; + public String topic; } - static class Tagspec { - public String resource; - public String name; - public String value; - - public boolean isValid() { - return resource != null && name != null && value != null; + public static void main(String[] args) { + Logger.getRootLogger().setLevel(Level.OFF); + try { + Connection conn = connectToDatabaseOrDie(); + File sql = new File("create.sql"); + executeSqlScript(conn, sql); + conn.close(); + } catch (Exception e) { + e.printStackTrace(); + System.exit(1); } - } + port(Integer.parseInt(System.getenv("PORT"))); - static class Planspec { - public String small; - public String medium; - public String large; - } + post("/v1/kafka/cluster/:cluster/user/:username/topic/:topic/consumergroup/rotate", (req, res) -> { + String newgroup = ""; + try { + newgroup = rotateConsumergroup(req.params(":cluster"), req.params(":username"), req.params(":topic")); + } catch (Exception e) { + res.status(getStatusFromMessage(e.getMessage())); + return "{\"result\":\"" + "error" + "\", \"message\":\"" + fixException(e) + "\"}"; + } + res.status(200); + return "{\"rotate\":\"" + req.params(":topic") + "\", \"message\":\"" + newgroup + "\"}"; + }); - static class ErrorMessagespec { - public int status; - public String message; + get("/v1/kafka/cluster/:cluster/credentials/:username", (req, res) -> { + String username = req.params("username"); + String cluster = req.params("cluster"); + JSONObject main = new JSONObject(); + try { + Map credentials = getCredentials(username, cluster); + main.putAll(credentials); + } catch (Exception e) { + e.printStackTrace(); + res.status(getStatusFromMessage(e.getMessage())); + return "{\"result\":\"" + "error" + "\", \"message\":\"" + fixException(e) + "\"}"; + } + res.status(200); + return main.toJSONString(); + }); - public ErrorMessagespec(int status_, String message_) { - this.status = status_; - this.message = message_; - } - } + post("/v1/kafka/cluster/:cluster/user", (req, res) -> { + String cluster = req.params("cluster"); + String[] up; + String username = ""; + String password = ""; + try { + up = claimUser(cluster); + username = up[0]; + password = up[1]; + } catch (Exception e) { + e.printStackTrace(); + res.status(getStatusFromMessage(e.getMessage())); + return "{\"result\":\"" + "error" + "\", \"message\":\"" + fixException(e) + "\"}"; + } + res.status(200); + return "{\"username\":\"" + username + "\", \"password\":\"" + password + "\"}"; + }); - static class OKMessagespec { - public int status; - public String message; + get("/v1/kafka/cluster/:cluster/topic/:topic", (req, res) -> { + JSONObject main = new JSONObject(); + String topictoget = req.params(":topic"); + try { + String cluster = req.params("cluster"); + Topic n = getTopic(topictoget, cluster); + JSONObject t = new JSONObject(); + JSONObject tconfig = new JSONObject(); + tconfig.put("partitions", n.partitions); + tconfig.put("replicas", n.replicas); + tconfig.put("retentionms", n.retentionms); + tconfig.put("organization", n.organization); + tconfig.put("cleanuppolicy", n.cleanuppolicy); + t.put("config", tconfig); + t.put("name", n.topic); + t.put("description", n.description); + t.put("organization", n.organization); + main.put("topic", t); + } catch (Exception e) { + e.printStackTrace(); + res.status(getStatusFromMessage(e.getMessage())); + return "{\"result\":\"" + "error" + "\", \"message\":\"" + fixException(e) + "\"}"; + } + res.status(200); + return main.toJSONString(); + }); - public OKMessagespec(int status_, String message_) { - this.status = status_; - this.message = message_; - } - } + get("/v1/kafka/cluster/:cluster/topics", (req, res) -> { + JSONArray topiclist = new JSONArray(); + try { + String cluster = req.params("cluster"); + Vector topics = getTopics(cluster); + java.util.Iterator ti = topics.iterator(); + while (ti.hasNext()) { + JSONObject main = new JSONObject(); + Topic n = (Topic) ti.next(); + JSONObject t = new JSONObject(); + JSONObject tconfig = new JSONObject(); + tconfig.put("partitions", n.partitions); + tconfig.put("replicas", n.replicas); + tconfig.put("retentionms", n.retentionms); + tconfig.put("organization", n.organization); + tconfig.put("cleanuppolicy", n.cleanuppolicy); + t.put("config", tconfig); + t.put("name", n.topic); + t.put("description", n.description); + t.put("organization", n.organization); + main.put("topic", t); + topiclist.add(main); + } + } catch (Exception e) { + e.printStackTrace(); + res.status(getStatusFromMessage(e.getMessage())); + return "{\"result\":\"" + "error" + "\", \"message\":\"" + fixException(e) + "\"}"; - static class TagList { - public Object[] tags; + } + res.status(200); + return topiclist.toJSONString(); + }); - public TagList(Object - [] tags_) { - this.tags = tags_; - } - } - public KafkaBroker() { + get("/v1/kafka/cluster/:cluster/consumergroups", (req, res) -> { + JSONArray list = new JSONArray(); + try { + String cluster = req.params("cluster"); + Vector consumergroups = getConsumergroups(cluster); + java.util.Iterator cgi = consumergroups.iterator(); + while (cgi.hasNext()) { + Consumergroup n = cgi.next(); + JSONObject cgmain = new JSONObject(); + JSONObject cg = new JSONObject(); + cg.put("consumergroupname", n.consumergroupname); + cg.put("username", n.username); + cg.put("topic", n.topic); + cgmain.put("consumergroup", cg); + list.add(cgmain); + } + } catch (Exception e) { + e.printStackTrace(); + res.status(getStatusFromMessage(e.getMessage())); + return "{\"result\":\"" + "error" + "\", \"message\":\"" + fixException(e) + "\"}"; + } + res.status(200); + return list.toJSONString(); + }); - } + get("/v1/kafka/cluster/:cluster/topic/:topic/consumergroups", (req, res) -> { + JSONArray list = new JSONArray(); + try { + String cluster = req.params(":cluster"); + String topic = req.params(":topic"); + + Vector consumergroups = getConsumergroupsForTopic(cluster, topic); + java.util.Iterator cgi = consumergroups.iterator(); + while (cgi.hasNext()) { + Consumergroup n = cgi.next(); + JSONObject cgmain = new JSONObject(); + JSONObject cg = new JSONObject(); + cg.put("consumergroupname", n.consumergroupname); + cg.put("username", n.username); + cg.put("topic", n.topic); + cgmain.put("consumergroup", cg); + list.add(cgmain); + } + } catch (Exception e) { + e.printStackTrace(); + res.status(getStatusFromMessage(e.getMessage())); + return "{\"result\":\"" + "error" + "\", \"message\":\"" + fixException(e) + "\"}"; - public static void main(String[] args) { - KafkaBroker kb = new KafkaBroker(); - kb.runServer(); - } + } + res.status(200); + return list.toJSONString(); + }); + put("/v1/kafka/cluster/:cluster/acl/user/:user/topic/:topic/role/:role", (req, res) -> { + String result = ""; + String cluster = ""; + String topic = ""; + String user = ""; + String role = ""; + try { + + cluster = req.params(":cluster"); + topic = req.params(":topic"); + user = req.params(":user"); + role = req.params(":role"); + if (!role.equalsIgnoreCase("producer") && !role.equalsIgnoreCase("consumer")) { + res.status(400); + return "{\"result\":\"" + "error" + "\", \"message\":\"" + + "role must be either producer or consumer" + "\"}"; + } + result = grantUserTopicRole(cluster, user, topic, role); + } catch (Exception e) { + e.printStackTrace(); + res.status(500); + return "{\"result\":\"" + "error" + "\", \"message\":\"" + fixException(e) + "\"}"; + } + res.status(200); + return "{\"acl\":\"" + user + "/" + topic + "/" + role + "\", \"message\":\"" + result + "\"}"; + }); + post("/v1/kafka/cluster/:cluster/topic", (req, res) -> { + String topicname = ""; + String result = ""; + try { + String cluster = req.params("cluster"); + String body = req.body(); + long replicas = Long.parseLong(System.getenv(cluster.toUpperCase() + "_DEFAULT_REPLICAS")); + JSONParser jsonParser = new JSONParser(); + StringReader sr = new StringReader(body); + JSONObject jsonObject = (JSONObject) jsonParser.parse(sr); + JSONObject topicelement = (JSONObject) jsonObject.get("topic"); + topicname = (String) topicelement.get("name"); + String description = (String) topicelement.get("description"); + String organization = (String) topicelement.get("organization"); + JSONObject configelements = (JSONObject) topicelement.get("config"); + long partitions = 0; + Set configkeys = (Set) configelements.keySet(); + if (configkeys.contains("partitions")) { + partitions = (Long) configelements.get("partitions"); + } else { + partitions = Long.parseLong(System.getenv(cluster.toUpperCase() + "_DEFAULT_PARTITIONS")); + } + long retentiontime = 0; + if (configkeys.contains("retention.ms")) { + retentiontime = (Long) configelements.get("retention.ms"); + } else { + retentiontime = Long.parseLong(System.getenv(cluster.toUpperCase() + "_DEFAULT_RETENTION")); + } + String cleanuppolicy = "delete"; + if (configkeys.contains("cleanup.policy")) { + cleanuppolicy = (String) configelements.get("cleanup.policy"); + } else { + cleanuppolicy = "delete"; + } - public void runServer() { + result = createTopic(cluster, topicname, partitions, replicas, retentiontime, cleanuppolicy, + organization, description); - KAFKA_BROKERS = System.getenv("KAFKA_BROKERS"); - KAFKA_ZOOKEEPERS = System.getenv("KAFKA_ZOOKEEPERS"); - Logger.getRootLogger().setLevel(Level.OFF); - port(Integer.parseInt(System.getenv("PORT"))); + if (result.equalsIgnoreCase("created")) { + res.status(201); + } else if (result.equalsIgnoreCase("error")) { + res.status(500); + } else if (result.equalsIgnoreCase("exists")) { + res.status(200); + } else { + res.status(500); + } + } catch (Exception e) { + e.printStackTrace(); + res.status(getStatusFromMessage(e.getMessage())); + return "{\"result\":\"" + "error" + "\", \"message\":\"" + fixException(e) + "\"}"; - post("/v1/kafka/instance", (req, res) -> { - ObjectMapper mapper = new ObjectMapper(); - InstancePayload instancepayload = mapper.readValue(req.body(), InstancePayload.class); - if (!instancepayload.isValid()) { - ErrorMessagespec ems = new ErrorMessagespec(400, "Bad Request"); - res.status(400); - res.type("application/json"); - return new Gson().toJson(ems); - } - System.out.println(instancepayload.billingcode); - System.out.println(instancepayload.plan); - Object result = provisionTLHandler(instancepayload); - if (result instanceof Instance) { - res.status(201); - res.type("application/json"); - return new Gson().toJson((Instance) result); - } else if (result instanceof ErrorMessagespec) { - res.status(((ErrorMessagespec) result).status); - res.type("application/json"); - return new Gson().toJson(result); - } else { - res.status(500); - res.type("application/json"); - return new Gson().toJson(new ErrorMessagespec(500, "Unknown Error")); } - }); + return "{\"topicrequested\":\"" + topicname + "\", \"message\":\"" + result + "\"}"; + }); - get("/v1/kafka/url/:name", (req, res) -> { - Object result = getUrlTLHandler(req.params(":name")); - if (result instanceof Instance) { + put("/v1/kafka/cluster/:cluster/topic/:topic/retentionms/:retentionms", (req, res) -> { + String topictoupdate = req.params(":topic"); + String retentionms = req.params(":retentionms"); + String cluster = req.params(":cluster"); + String result = updateTopicRetentionTime(cluster, topictoupdate, Long.parseLong(retentionms)); + if (result.equalsIgnoreCase("updated")) { res.status(200); - res.type("application/json"); - return new Gson().toJson(result); - } else if (result instanceof ErrorMessagespec) { - res.type("application/json"); - res.status(((ErrorMessagespec) result).status); - return new Gson().toJson(result); + } else if (result.equalsIgnoreCase("error")) { + res.status(500); + } else if (result.equalsIgnoreCase("does not exist")) { + res.status(500); } else { res.status(500); - res.type("application/json"); - return new Gson().toJson(new ErrorMessagespec(500, "Unknown Error")); } - }); + return "{\"topic\":\"" + topictoupdate + "\", \"message\":\"" + result + "\"}"; + }); - delete("/v1/kafka/instance/:topic", (req, res) -> { - Object result = deleteTopicTLHandler(req.params(":topic")); - if (result instanceof OKMessagespec) { + delete("/v1/kafka/cluster/:cluster/topic/:topic", (req, res) -> { + String cluster = req.params(":cluster"); + String topictodelete = req.params(":topic"); + String result = deleteTopic(cluster, topictodelete); + if (result.equalsIgnoreCase("deleted")) { res.status(200); - res.type("application/json"); - return new Gson().toJson(result); - } else if (result instanceof ErrorMessagespec) { - res.status(((ErrorMessagespec) result).status); - res.type("application/json"); - return new Gson().toJson(result); + } else if (result.equalsIgnoreCase("error")) { + res.status(500); + } else if (result.equalsIgnoreCase("does not exist")) { + res.status(500); } else { res.status(500); - res.type("application/json"); - return new Gson().toJson(new ErrorMessagespec(500, "Unknown Error")); } - }); + return "{\"topic\":\"" + topictodelete + "\", \"message\":\"" + result + "\"}"; + }); - get("/v1/kafka/plans", (req, res) -> { - Planspec plans = new Planspec(); - plans.small = "1 partition, 1 replica, 1 hour "; - plans.medium = "2 partitions, 2 replicas, 10 hours"; - plans.large = "6 partitions, 2 replicas, 5 days"; - res.status(200); - res.type("application/json"); - return new Gson().toJson(plans); + after((request, response) -> { + System.out.println(requestAndResponseInfoToString(request, response)); }); + } + public static String updateTopicRetentionTime(String cluster, String topic, Long retentionms) throws Exception { + String toreturn = ""; + ZkClient zkClient = null; + ZkUtils zkUtils = null; + try { + String zookeeperHosts = System.getenv(cluster.toUpperCase() + "_ZK"); + int sessionTimeOutInMs = 15 * 1000; + int connectionTimeOutInMs = 10 * 1000; - post("/v1/kafka/tag", (req, res) -> { - ObjectMapper mapper = new ObjectMapper(); - Tagspec tagobj = mapper.readValue(req.body(), Tagspec.class); - if (!tagobj.isValid()) { - ErrorMessagespec ems = new ErrorMessagespec(400, "Bad Request"); - res.status(400); - res.type("application/json"); - return new Gson().toJson(ems); - } - System.out.println(tagobj.resource); - System.out.println(tagobj.name); - System.out.println(tagobj.value); - Object result = addTagTLHandler(tagobj); - if (result instanceof OKMessagespec) { - res.status(200); - res.type("application/json"); - return new Gson().toJson(result); - } else if (result instanceof ErrorMessagespec) { - res.status(((ErrorMessagespec) result).status); - res.type("application/json"); - return new Gson().toJson(result); + zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, + ZKStringSerializer$.MODULE$); + zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false); + + String topicName = topic; + boolean exists = AdminUtils.topicExists(zkUtils, topicName); + if (exists) { + Properties topicConfiguration = new Properties(); + topicConfiguration.setProperty("retention.ms", Long.toString(retentionms)); + AdminUtils.changeTopicConfig(zkUtils, topicName, topicConfiguration); + Connection conn = null; + conn = connectToDatabaseOrDie(); + String retentionmssql = "update provision_topic set retentionms = ?, updated_timestamp = now() where topic= ? and cluster = ?"; + PreparedStatement ustmt = conn.prepareStatement(retentionmssql); + ustmt.setLong(1, retentionms); + ustmt.setString(2, topic); + ustmt.setString(3, cluster); + ustmt.executeUpdate(); + ustmt.close(); + conn.close(); + + toreturn = "updated"; } else { - res.status(500); - res.type("application/json"); - return new Gson().toJson(new ErrorMessagespec(500, "Unknown Error")); + toreturn = "does not exist"; } - }); - get("/v1/kafka/tags/:topic", (req, res) -> { - Object result = getTagsTLHandler(req.params(":topic")); - if (result instanceof TagList) { - res.status(200); - res.type("application/json"); - return new Gson().toJson(result); - } else if (result instanceof ErrorMessagespec) { - res.status(((ErrorMessagespec) result).status); - res.type("application/json"); - return new Gson().toJson(result); - } else { - res.status(500); - res.type("application/json"); - return new Gson().toJson(new ErrorMessagespec(500, "Unknown Error")); + } catch (Exception ex) { + ex.printStackTrace(); + toreturn = "error " + ex.getMessage(); + return toreturn; + } finally { + if (zkClient != null) { + zkClient.close(); } - }); - } - - public Object provisionTLHandler(InstancePayload instance) { - Object toreturn; - - String uuid = UUID.randomUUID().toString(); - String nameprefix = System.getenv("NAME_PREFIX"); - String name = nameprefix + uuid.split("-")[0]; - System.out.println(name); - String partitions = null; - String replicas = null; - String timetokeep = null; - if (instance.plan.equalsIgnoreCase("small")) { - partitions = "1"; - replicas = "1"; - timetokeep = "3600000"; // 1 hour - } - if (instance.plan.equalsIgnoreCase("medium")) { - partitions = "2"; - replicas = "2"; - timetokeep = "36000000"; //10 hours - } - if (instance.plan.equalsIgnoreCase("large")) { - partitions = "6"; - replicas = "2"; - timetokeep = "432000000"; //5 days } + return toreturn; + } + public static String createTopic(String cluster, String args, long partitions, long replicas, long retentionms, + String cleanuppolicy, String organization, String description) throws Exception { + ZkClient zkClient = null; + ZkUtils zkUtils = null; + String toreturn = ""; try { - createTopic(name, partitions, replicas, timetokeep); - } catch (Exception e) { - e.printStackTrace(); - ErrorMessagespec errormessage = new ErrorMessagespec(500, e.getMessage()); - toreturn = errormessage; - return toreturn; - } - try { - insertNew(name, instance.plan); - } catch (Exception e) { - e.printStackTrace(); - ErrorMessagespec errormessage = new ErrorMessagespec(500, e.getMessage()); - toreturn = errormessage; - return toreturn; - } + String zookeeperHosts = System.getenv(cluster.toUpperCase() + "_ZK"); + int sessionTimeOutInMs = 15 * 1000; + int connectionTimeOutInMs = 10 * 1000; - try { - Object result = getUrlTLHandler(name); - toreturn = result; - } catch (Exception e) { - e.printStackTrace(); - ErrorMessagespec errormessage = new ErrorMessagespec(500, e.getMessage()); - toreturn = errormessage; - return toreturn; + zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, + ZKStringSerializer$.MODULE$); + zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false); + String topicName = args; + int noOfPartitions = toIntExact(partitions); + int noOfReplication = toIntExact(replicas); + Properties topicConfiguration = new Properties(); + topicConfiguration.setProperty("retention.ms", Long.toString(retentionms)); + topicConfiguration.setProperty("cleanup.policy", cleanuppolicy); + boolean exists = AdminUtils.topicExists(zkUtils, topicName); + if (!exists) { + AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration, + kafka.admin.RackAwareMode.Disabled$.MODULE$); + toreturn = "created"; + } else { + return "exists"; + + } + Connection conn = null; + conn = connectToDatabaseOrDie(); + String inserttopic = "insert into provision_topic (topic, partitions, replicas, retentionms, cleanuppolicy, cluster, organization,description ) values (?,?,?,?,?,?,?,?)"; + PreparedStatement stmt = conn.prepareStatement(inserttopic); + stmt.setString(1, topicName); + stmt.setLong(2, partitions); + stmt.setLong(3, replicas); + stmt.setLong(4, retentionms); + stmt.setString(5, cleanuppolicy); + stmt.setString(6, cluster); + stmt.setString(7, organization); + stmt.setString(8, description); + + stmt.executeUpdate(); + stmt.close(); + conn.close(); + } catch (Exception e) { + throwWithStatus(e, "500"); + } finally { + if (zkClient != null) { + zkClient.close(); + } } return toreturn; - } - private Object getUrlTLHandler(String name) { + public static Topic getTopic(String topicname, String cluster) throws Exception { - Instance instance = new Instance(); - instance.KAFKA_BROKERS = KAFKA_BROKERS; - instance.KAFKA_TOPIC = name; - instance.KAFKA_ZOOKEEPERS = KAFKA_ZOOKEEPERS; - return instance; + Topic toreturn = new Topic(); + try { + Connection conn = null; + conn = connectToDatabaseOrDie(); + String gettopics = "select topic, partitions, replicas, retentionms, cleanuppolicy, organization, description from provision_topic where cluster = ? and topic = ? limit 1"; + PreparedStatement stmt = conn.prepareStatement(gettopics); + stmt.setString(1, cluster); + stmt.setString(2, topicname); + ResultSet rs = stmt.executeQuery(); + while (rs.next()) { + toreturn.topic = rs.getString(1); + toreturn.partitions = rs.getLong(2); + toreturn.replicas = rs.getLong(3); + toreturn.retentionms = rs.getLong(4); + toreturn.cleanuppolicy = rs.getString(5); + toreturn.organization = rs.getString(6); + toreturn.description = rs.getString(7); + } + stmt.close(); + conn.close(); + } catch (Exception ex) { + ex.printStackTrace(); + } + return toreturn; } - private Connection connectToDatabaseOrDie() { - Connection conn = null; + public static Vector getTopics(String cluster) throws Exception { + + java.util.Vector toreturn = new Vector(); try { - Class.forName("org.postgresql.Driver"); - String url = System.getenv("BROKERDB"); - conn = DriverManager.getConnection(url, System.getenv("BROKERDBUSER"), System.getenv("BROKERDBPASS")); + Connection conn = null; + conn = connectToDatabaseOrDie(); + String gettopics = "select topic, partitions, replicas, retentionms, cleanuppolicy, organization, description from provision_topic where cluster = ? order by organization, topic"; + PreparedStatement stmt = conn.prepareStatement(gettopics); + stmt.setString(1, cluster); + ResultSet rs = stmt.executeQuery(); + while (rs.next()) { + Topic topic = new Topic(); + topic.topic = rs.getString(1); + topic.partitions = rs.getLong(2); + topic.replicas = rs.getLong(3); + topic.retentionms = rs.getLong(4); + topic.cleanuppolicy = rs.getString(5); + topic.organization = rs.getString(6); + topic.description = rs.getString(7); + toreturn.add(topic); + } + stmt.close(); } catch (Exception e) { - e.printStackTrace(); + throwWithStatus(e, "500"); } - return conn; + return toreturn; } - - public void createTopic(String args, String partitions, String replicas, String timetokeep) throws Exception { - ZkClient zkClient = null; - ZkUtils zkUtils = null; - Object toreturn; + public static String setACL(String cluster, String topic, String user, String role, String consumergroupname) + throws Exception { + String toreturn = ""; try { - String zookeeperHosts = KAFKA_ZOOKEEPERS; - int sessionTimeOutInMs = 15 * 1000; - int connectionTimeOutInMs = 10 * 1000; - - zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$); - zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false); + String zk = System.getenv(cluster.toUpperCase() + "_ZK"); + if (role.equalsIgnoreCase("producer")) { + String[] cmdPArm = { "--authorizer-properties", "zookeeper.connect=" + zk, "--add", "--allow-principal", + "User:" + user, "--" + role, "--topic", topic }; + kafka.admin.AclCommand.main(cmdPArm); + toreturn = "created"; + } - String topicName = args; - int noOfPartitions = Integer.parseInt(partitions); - ; - int noOfReplication = Integer.parseInt(replicas); - Properties topicConfiguration = new Properties(); - topicConfiguration.setProperty("retention.ms", timetokeep); + if (role.equalsIgnoreCase("consumer")) { + String[] cmdPArm = { "--authorizer-properties", "zookeeper.connect=" + zk, "--add", "--allow-principal", + "User:" + user, "--" + role, "--group", "*", "--topic", topic }; + kafka.admin.AclCommand.main(cmdPArm); + toreturn = "created"; + } + } catch (Exception e) { + throwWithStatus(e, "500"); + } + return toreturn; + } - AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration); + public static Vector getConsumergroups(String cluster) throws Exception { - } catch (Exception ex) { - throw ex; - } finally { - if (zkClient != null) { - zkClient.close(); + Vector toreturn = new Vector(); + try { + Connection conn = null; + conn = connectToDatabaseOrDie(); + String gettopics = "select username, topic, consumergroupname from consumergroups where cluster = ?"; + PreparedStatement stmt = conn.prepareStatement(gettopics); + stmt.setString(1, cluster); + ResultSet rs = stmt.executeQuery(); + while (rs.next()) { + Consumergroup cg = new Consumergroup(); + cg.username = rs.getString(1); + cg.topic = rs.getString(2); + cg.consumergroupname = rs.getString(3); + toreturn.add(cg); } + stmt.close(); + } catch (Exception e) { + throwWithStatus(e, "500"); } - + return toreturn; } - public Object deleteTopicTLHandler(String args) { - Object toreturn; + public static String deleteTopic(String cluster, String args) { + String zookeeperHosts = System.getenv(cluster.toUpperCase() + "_ZK"); + String toreturn = ""; ZkClient zkClient = null; ZkUtils zkUtils = null; try { - String zookeeperHosts = KAFKA_ZOOKEEPERS; int sessionTimeOutInMs = 15 * 1000; int connectionTimeOutInMs = 10 * 1000; - zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$); + zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, + ZKStringSerializer$.MODULE$); zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false); String topicName = args; System.out.println("about to delete " + topicName); - AdminUtils.deleteTopic(zkUtils, topicName); - deleteRow(topicName); - + boolean exists = AdminUtils.topicExists(zkUtils, topicName); + if (exists) { + AdminUtils.deleteTopic(zkUtils, topicName); + toreturn = "deleted"; + } else { + toreturn = "does not exist"; + } } catch (Exception ex) { - toreturn = new ErrorMessagespec(500, ex.getMessage()); - return toreturn; + ex.printStackTrace(); + toreturn = "error - " + ex.getMessage(); } finally { if (zkClient != null) { zkClient.close(); } } - return new OKMessagespec(200, "Topic deleted"); + return toreturn; } - private void insertNew(String name, String plan) throws Exception { + private static Connection connectToDatabaseOrDie() throws Exception { Connection conn = null; - conn = connectToDatabaseOrDie(); try { - String getinstance = "insert into provision (name,plan,claimed) values (?,?,?)"; - PreparedStatement stmt = conn.prepareStatement(getinstance); - stmt.setString(1, name); - stmt.setString(2, plan); - stmt.setString(3, "yes"); - stmt.executeUpdate(); + Class.forName("org.postgresql.Driver"); + String url = System.getenv("BROKERDB"); + conn = DriverManager.getConnection(url, System.getenv("BROKERDBUSER"), System.getenv("BROKERDBPASS")); + } catch (Exception e) { + throwWithStatus(e, "500"); + } + return conn; + } + private static String[] claimUser(String cluster) throws Exception { + String username = ""; + String password = ""; + Connection conn = null; + conn = connectToDatabaseOrDie(); + try { + String claimquery = "select username,password from provision_user where claimed=false and cluster = ? limit 1"; + PreparedStatement stmt = conn.prepareStatement(claimquery); + stmt.setString(1, cluster); + ResultSet rs = stmt.executeQuery(); + while (rs.next()) { + username = rs.getString(1); + password = rs.getString(2); + } stmt.close(); + String claimupdate = "update provision_user set claimed = true, claimed_timestamp = now() where username= ? and cluster = ?"; + PreparedStatement ustmt = conn.prepareStatement(claimupdate); + ustmt.setString(1, username); + ustmt.setString(2, cluster); + ustmt.executeUpdate(); + ustmt.close(); conn.close(); } catch (Exception e) { - throw e; + throwWithStatus(e, "500"); } - } + return new String[] { username, password }; + } - private void deleteRow(String name) throws Exception { - Connection conn = null; - conn = connectToDatabaseOrDie(); + private static String grantUserTopicRole(String cluster, String username, String topic, String role) + throws Exception { + String toreturn = ""; try { - String deleteinstance = "delete from provision where name = ?"; - PreparedStatement stmt = conn.prepareStatement(deleteinstance); - stmt.setString(1, name); - int updated = stmt.executeUpdate(); - System.out.println(updated); + UUID idOne = UUID.randomUUID(); + String consumergroupname = username + "-" + idOne.toString().split("-")[0]; + toreturn = setACL(cluster, topic, username, role, consumergroupname); + Connection conn = null; + conn = connectToDatabaseOrDie(); + String insertacl = "insert into provision_acl (userid,topicid,role, cluster) values (?,?,?,?)"; + PreparedStatement stmt = conn.prepareStatement(insertacl); + stmt.setString(1, getUseridByName(cluster, username)); + stmt.setString(2, getTopicidByName(cluster, topic)); + stmt.setString(3, role); + stmt.setString(4, cluster); + stmt.executeUpdate(); stmt.close(); + + if (role.equalsIgnoreCase("consumer")) { + String insertcg = "insert into provision_consumergroup (userid, topicid, consumergroupname, active, cluster) values (?,?,?,?,?)"; + PreparedStatement stmt2 = conn.prepareStatement(insertcg); + stmt2.setString(1, getUseridByName(cluster, username)); + stmt2.setString(2, getTopicidByName(cluster, topic)); + stmt2.setString(3, consumergroupname); + stmt2.setBoolean(4, true); + stmt2.setString(5, cluster); + stmt2.executeUpdate(); + stmt2.close(); + } conn.close(); } catch (Exception e) { - throw e; + throwWithStatus(e, "500"); } + return toreturn; } + public static String[] removeNulls(String args[]) { + List list = new ArrayList(); + + for (String s : args) { + if (s != null && s.length() > 0) { + list.add(s); + } + } + + return list.toArray(new String[list.size()]); + } + + private static String getTopicidByName(String cluster, String name) throws Exception{ + String id = ""; - private Object addTagTLHandler(Tagspec tag) { - Object toreturn; try { - Object[] existingtags = getTags(tag.resource); - Object[] newtags = appendValue(existingtags, tag); - updateTagsDB(tag.resource, newtags); + Connection conn = null; + conn = connectToDatabaseOrDie(); + String getinstance = "select topicid from provision_topic where topic = ? and cluster = ?"; + PreparedStatement stmt = conn.prepareStatement(getinstance); + stmt.setString(1, name); + stmt.setString(2, cluster); + ResultSet rs = stmt.executeQuery(); + int rows = 0; + while (rs.next()) { + rows++; + id = rs.getString(1); + } + stmt.close(); + conn.close(); + if (rows != 1) { + //return "": + throw new CustomException("topic does not exist"); + } } catch (Exception e) { e.printStackTrace(); - return new ErrorMessagespec(500, "Error Adding Tag"); + throw e; } - return new OKMessagespec(201, "Tag Added"); + return id; } - private Object getTagsTLHandler(String name) { - Object toreturn; + private static String getUseridByName(String cluster, String name) { + String id = ""; + try { - TagList tl = new TagList(getTags(name)); - toreturn = tl; + Connection conn = null; + conn = connectToDatabaseOrDie(); + String getinstance = "select userid from provision_user where username = ? and cluster = ?"; + PreparedStatement stmt = conn.prepareStatement(getinstance); + stmt.setString(1, name); + stmt.setString(2, cluster); + ResultSet rs = stmt.executeQuery(); + int rows = 0; + while (rs.next()) { + rows++; + id = rs.getString(1); + } + stmt.close(); + conn.close(); + if (rows != 1) { + return ""; + } } catch (Exception e) { - e.printStackTrace(); - toreturn = new ErrorMessagespec(500, e.getMessage()); - return toreturn; } - return toreturn; - + return id; } - private Object[] getTags(String name) throws Exception { - Object[] tagsa = new Tagspec[0]; - Connection conn = null; - conn = connectToDatabaseOrDie(); + + private static Map getCredentials(String username, String cluster) throws Exception { + String password = ""; + Map credentials = new HashMap(); + java.util.Vector producerlist = new Vector(); + java.util.Vector consumerlist = new Vector(); + try { - String tagsquery = "select tags from provision where name=?"; - PreparedStatement stmt = conn.prepareStatement(tagsquery); - stmt.setString(1, name); + Connection conn = null; + conn = connectToDatabaseOrDie(); + String getinstance = "select password, topic, role, consumergroupname from credentials where username = ? and cluster = ?"; + PreparedStatement stmt = conn.prepareStatement(getinstance); + stmt.setString(1, username); + stmt.setString(2, cluster); ResultSet rs = stmt.executeQuery(); + while (rs.next()) { - String tagjson = rs.getString(1); - if (tagjson != null) { - JsonArray jsonArray = new JsonParser().parse(tagjson).getAsJsonArray(); - for (int i = 0; i < jsonArray.size(); i++) { - Gson gson = new Gson(); - JsonElement str = jsonArray.get(i); - Tagspec obj = gson.fromJson(str, Tagspec.class); - System.out.println(obj); - System.out.println(str); - tagsa = appendValue(tagsa, obj); - System.out.println("-------"); - } - } else { + password = rs.getString(1); + String topic = rs.getString(2); + String role = rs.getString(3); + String consumergroupname = rs.getString(4); + + if (role.equalsIgnoreCase("producer")) { + producerlist.add(topic); + } + if (role.equalsIgnoreCase("consumer")) { + consumerlist.add(topic); + credentials.put("KAFKA_CG_" + topic.toUpperCase().replace('.', '_'), consumergroupname); + } } + stmt.close(); + conn.close(); + credentials.put("KAFKA_HOSTNAME", System.getenv(cluster.toUpperCase() + "_KAFKA_HOSTNAME")); + credentials.put("KAFKA_PORT", System.getenv(cluster.toUpperCase() + "_KAFKA_PORT")); + credentials.put("KAFKA_LOCATION", System.getenv(cluster.toUpperCase() + "_KAFKA_LOCATION")); + credentials.put("KAFKA_AVRO_REGISTRY_LOCATION", + System.getenv(cluster.toUpperCase() + "_KAFKA_AVRO_REGISTRY_LOCATION")); + credentials.put("KAFKA_AVRO_REGISTRY_HOSTNAME", + System.getenv(cluster.toUpperCase() + "_KAFKA_AVRO_REGISTRY_HOSTNAME")); + credentials.put("KAFKA_AVRO_REGISTRY_PORT", + System.getenv(cluster.toUpperCase() + "_KAFKA_AVRO_REGISTRY_PORT")); + credentials.put("KAFKA_USERNAME", username); + credentials.put("KAFKA_PASSWORD", password); + credentials.put("KAFKA_RESOURCENAME", cluster); + credentials.put("KAFKA_SASL_MECHANISM", "PLAIN"); + credentials.put("KAFKA_SECURITY_PROTOCOL", "SASL_SSL"); + if (producerlist.size() > 0) { + String plist = StringUtils.join(producerlist, ','); + credentials.put("KAFKA_PRODUCE_LIST", plist); + } + if (consumerlist.size() > 0) { + String clist = StringUtils.join(consumerlist, ','); + credentials.put("KAFKA_CONSUME_LIST", clist); + } + } catch (Exception e) { throw e; } - return tagsa; + return credentials; } - private Object[] appendValue(Object[] obj, Tagspec newObj) { + private static Vector getConsumergroupsForTopic(String cluster, String topic) throws Exception { + Vector toreturn = new Vector(); - ArrayList temp = new ArrayList(Arrays.asList(obj)); - temp.add(newObj); - return temp.toArray(); + try { + getTopicidByName(cluster, topic); + Connection conn = null; + conn = connectToDatabaseOrDie(); + String gettopics = "select username, topic, consumergroupname from consumergroups where cluster = ? and topic = ?"; + PreparedStatement stmt = conn.prepareStatement(gettopics); + stmt.setString(1, cluster); + stmt.setString(2, topic); + ResultSet rs = stmt.executeQuery(); + while (rs.next()) { + Consumergroup cg = new Consumergroup(); + cg.username = rs.getString(1); + cg.topic = rs.getString(2); + cg.consumergroupname = rs.getString(3); + toreturn.add(cg); + } + stmt.close(); + } catch (Exception e) { + System.out.println(e.getMessage()); + throwWithStatus(e, "500"); + } + return toreturn; + } + private static String requestAndResponseInfoToString(Request request, Response response) { + StringBuilder sb = new StringBuilder(); + sb.append(request.requestMethod()); + sb.append(" " + request.url()); + HttpServletResponse raw = response.raw(); + sb.append(" status=" + raw.getStatus()); + String body = request.body(); + String newbody = body.trim().replaceAll(" +", " "); + sb.append(" " + newbody); + return sb.toString(); } - private void updateTagsDB(String name, Object[] tagsa) throws Exception { - Connection conn = null; - conn = connectToDatabaseOrDie(); + private static String rotateConsumergroup(String cluster, String username, String topic) throws Exception { + UUID idOne = UUID.randomUUID(); + String consumergroupname = username + "-" + idOne.toString().split("-")[0]; try { - String getinstance = "update provision set tags = to_json(?::json) where name= ?"; - PreparedStatement stmt = conn.prepareStatement(getinstance); - Gson gson = new Gson(); - stmt.setString(1, gson.toJson(tagsa)); - stmt.setString(2, name); - stmt.executeUpdate(); - stmt.close(); + String userid = getUseridByName(cluster, username); + String topicid = getTopicidByName(cluster, topic); + + Connection conn = null; + conn = connectToDatabaseOrDie(); + String deactivate = "update provision_consumergroup set active = false, updated_timestamp = now() where cluster = ? and userid = ? and topicid = ? and active = true"; + PreparedStatement ustmt = conn.prepareStatement(deactivate); + ustmt.setString(1, cluster); + ustmt.setString(2, userid); + ustmt.setString(3, topicid); + ustmt.executeUpdate(); + int updatecount = ustmt.getUpdateCount(); + System.out.println(updatecount); + if (updatecount != 1) { + ustmt.close(); + throw new CustomException( + "Update count was not equal to 1. Please verify values of user, topic, and cluser. Or user/topic/role was not a consumer"); + } + + String insertcg = "insert into provision_consumergroup (userid, topicid, consumergroupname, active, cluster) values (?,?,?,?,?)"; + PreparedStatement stmt2 = conn.prepareStatement(insertcg); + stmt2.setString(1, userid); + stmt2.setString(2, topicid); + stmt2.setString(3, consumergroupname); + stmt2.setBoolean(4, true); + stmt2.setString(5, cluster); + stmt2.executeUpdate(); + int insertcount = stmt2.getUpdateCount(); + System.out.println(insertcount); + if (updatecount != 1) { + ustmt.close(); + throw new CustomException( + "Insert count was not equal to 1. Please verify values of user, topic, and cluser. Or user/topic/role was not a consumer"); + } + stmt2.close(); conn.close(); + } catch (Exception e) { - throw e; + e.printStackTrace(); + throwWithStatus(e, "500"); } + return consumergroupname; + } + + private static String fixException(Exception e) { + String stripped = e.getMessage().replace("\"", "").replace('\n', '.'); + String[] splitted = stripped.split("~"); + return splitted[splitted.length-1]; + } + + private static void throwWithStatus(Exception e, String status) throws Exception { + throw new Exception("status~" + status + "~" + e.getMessage(), e); + } + + private static int getStatusFromMessage(String message) { + int toreturn = 501; + if (message.startsWith("status")) { + toreturn = Integer.parseInt(message.split("~")[1]); + } + return toreturn; } -} + private static void executeSqlScript(Connection conn, File inputFile) throws Exception { + String delimiter = ";"; + Scanner scanner; + try { + scanner = new Scanner(inputFile).useDelimiter(delimiter); + } catch (FileNotFoundException e1) { + e1.printStackTrace(); + return; + } + + Statement currentStatement = null; + while (scanner.hasNext()) { + String rawStatement = scanner.next() + delimiter; + try { + currentStatement = conn.createStatement(); + currentStatement.execute(rawStatement); + } catch (SQLException e) { + e.printStackTrace(); + throw e; + } finally { + if (currentStatement != null) { + try { + currentStatement.close(); + } catch (SQLException e) { + e.printStackTrace(); + throw e; + } + } + currentStatement = null; + } + } + scanner.close(); + } +}