-
Notifications
You must be signed in to change notification settings - Fork 44
Leidy Sphinx Task List API #43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,22 +1,28 @@ | ||
from flask import Flask | ||
from .db import db, migrate | ||
from .models import task, goal | ||
from .routes.task_routes import bp as tasks_bp | ||
from .routes.goal_routes import bp as goals_bp | ||
import os | ||
|
||
def create_app(config=None): | ||
app = Flask(__name__) | ||
|
||
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False | ||
#app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql+psycopg2://postgres:postgres@localhost:5432/task_list_api_development' | ||
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('SQLALCHEMY_DATABASE_URI') | ||
|
||
|
||
if config: | ||
# Merge `config` into the app's configuration | ||
# to override the app's default settings for testing | ||
app.config.update(config) | ||
|
||
db.init_app(app) | ||
migrate.init_app(app, db) | ||
migrate.init_app(app,db) | ||
|
||
# Register Blueprints here | ||
app.register_blueprint(tasks_bp) | ||
app.register_blueprint(goals_bp) | ||
|
||
return app |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,38 @@ | ||
from sqlalchemy.orm import Mapped, mapped_column | ||
from sqlalchemy.orm import Mapped, mapped_column,relationship | ||
from ..db import db | ||
from typing import TYPE_CHECKING, Optional | ||
if TYPE_CHECKING: | ||
from .task import Task | ||
|
||
class Goal(db.Model): | ||
__tablename__ = 'goal' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would automatically be set to |
||
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) | ||
title: Mapped[str] = mapped_column(nullable=False) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we want a column to be |
||
tasks: Mapped[Optional[list["Task"]]] = relationship(back_populates="goal") | ||
|
||
@classmethod | ||
def from_dict(cls, data): | ||
# Creates a Task instance from a dictionary | ||
|
||
return cls( | ||
title=data["title"], | ||
) | ||
|
||
def to_dict(self, include_name=True, tasks_ids=False): | ||
# Converts a Task instance to a dictionary | ||
goal_dict = { | ||
|
||
"id": self.id, | ||
"title": self.title, | ||
} | ||
|
||
if tasks_ids: | ||
tasks_ids_list = [task.id for task in self.tasks] | ||
goal_dict["task_ids"] = tasks_ids_list | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have an endpoint that will provide us with the same data that |
||
goal_dict.pop("title") | ||
|
||
if include_name: | ||
return {Goal.__name__.lower(): goal_dict} | ||
Comment on lines
+34
to
+35
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! We could even make this more general and maybe add it to the |
||
|
||
return goal_dict | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,46 @@ | ||
from sqlalchemy.orm import Mapped, mapped_column | ||
from sqlalchemy.orm import Mapped, mapped_column,relationship | ||
from datetime import datetime | ||
from typing import Optional | ||
from sqlalchemy import String, Text, DateTime, ForeignKey | ||
from ..db import db | ||
from typing import TYPE_CHECKING | ||
if TYPE_CHECKING: | ||
from .goal import Goal | ||
|
||
class Task(db.Model): | ||
__tablename__ = 'task' | ||
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) | ||
title: Mapped[str] = mapped_column(nullable=False) # Title column required | ||
description: Mapped[str] = mapped_column(nullable=False) # Description column required | ||
completed_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True, default=None) | ||
is_complete: Mapped[Optional[bool]] = False | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
goal_id: Mapped[Optional[int]] = mapped_column((ForeignKey("goal.id")), default=None) | ||
goal: Mapped[Optional["Goal"]] = relationship(back_populates="tasks") | ||
|
||
@classmethod | ||
def from_dict(cls, data): | ||
"""Creates a Task instance from a dictionary""" | ||
|
||
return cls( | ||
title=data["title"], | ||
description=data["description"], | ||
completed_at=data.get("completed_at") # Use .get() since "completed_at" is optional | ||
) | ||
|
||
def to_dict(self, include_name=True, goal_id=False): | ||
"""Converts a Task instance to a dictionary""" | ||
task_dict = { | ||
"id": self.id, | ||
"title": self.title, | ||
"description": self.description, | ||
"is_complete": bool(self.completed_at) # Convert to boolean for completeness | ||
} | ||
if goal_id: | ||
task_dict["goal_id"] = self.goal.id | ||
|
||
if include_name: | ||
return {Task.__name__.lower(): task_dict} | ||
|
||
return task_dict | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,100 @@ | ||
from flask import Blueprint | ||
from flask import Blueprint, request, abort, jsonify | ||
from app.models.goal import Goal | ||
from app.models.task import Task | ||
from app.routes.route_utilities import validate_model | ||
from ..db import db | ||
|
||
bp = Blueprint("bp",__name__,url_prefix="/goals") | ||
|
||
@bp.post("") | ||
def create_goal(): | ||
request_body = request.get_json() | ||
|
||
if "title" not in request_body: | ||
return {"details": "Invalid data"}, 400 | ||
|
||
title = request_body["title"] | ||
new_goal = Goal(title=title) | ||
|
||
db.session.add(new_goal) | ||
db.session.commit() | ||
Comment on lines
+13
to
+20
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic looks very similar to the logic we wrote in our create routes, what refactoring could be done here to move this code out of our routes into a helper function to support maintainability and scalability? (Hint: We did a refactor like this in Flasky). |
||
|
||
response = new_goal.to_dict() | ||
return response, 201 | ||
|
||
@bp.get("") | ||
def get_all_goals(): | ||
#execute the query statement and retrieve the models | ||
query = db.select(Goal) #select records from db Model | ||
|
||
query = query.order_by(Goal.id) | ||
goals = db.session.scalars(query) #retrieve the records | ||
|
||
response = [] | ||
for goal in goals: | ||
response.append(goal.to_dict(include_name=False)) | ||
return response | ||
|
||
@bp.get("/<goal_id>") | ||
def get_one_goal(goal_id): | ||
goal = validate_model(Goal,goal_id) | ||
return goal.to_dict() | ||
|
||
@bp.put("/<goal_id>") | ||
def update_one_goal_by_id(goal_id): | ||
goal = validate_model(Goal,goal_id) #record with id = goal_id | ||
request_body = request.get_json() | ||
goal.title = request_body["title"] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this code similar to our other |
||
db.session.commit() | ||
return goal.to_dict() | ||
|
||
@bp.delete("/<goal_id>") | ||
def delete_goal_by_id(goal_id): | ||
goal = validate_model(Goal,goal_id) | ||
|
||
#Delete the goal | ||
db.session.delete(goal) | ||
db.session.commit() | ||
|
||
response = { | ||
"details": f"Goal {goal.id} \"{goal.title}\" successfully deleted" | ||
} | ||
return response, 200 | ||
|
||
@bp.post("/<goal_id>/tasks") | ||
def add_tasks_to_goal(goal_id): | ||
|
||
goal = validate_model(Goal, goal_id) | ||
request_body = request.get_json() | ||
|
||
task_ids = request_body.get("task_ids") | ||
if not task_ids: | ||
response = {"details": "Invalid data"} | ||
abort(response, 400) | ||
|
||
#valid_task_ids = [] | ||
for task_id in task_ids: | ||
task = validate_model(Task,task_id) | ||
# assign each task to goal_id | ||
task.goal_id = goal_id | ||
|
||
response_body = goal.to_dict(include_name=False, tasks_ids=True) | ||
db.session.commit() | ||
|
||
return response_body , 200 | ||
|
||
@bp.get("/<goal_id>/tasks") | ||
def get_goals_and_tasks(goal_id): | ||
|
||
goal = validate_model(Goal, goal_id) | ||
|
||
# Get associated tasks and convert each to a dictionary | ||
task_dicts = [task.to_dict(include_name=False , goal_id=True) for task in goal.tasks] | ||
|
||
response = { | ||
"id": goal.id, | ||
"title": goal.title, | ||
"tasks": task_dicts # Populate with list of task dictionaries | ||
} | ||
|
||
return response |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
from flask import abort, make_response, request | ||
from ..db import db | ||
import os | ||
import requests | ||
|
||
# Slack webhook URL and token from environment variables | ||
SLACK_URL = "https://slack.com/api/chat.postMessage" | ||
SLACK_TOKEN = os.environ.get("SLACK_BOT_TOKEN") | ||
|
||
def validate_model(cls, model_id): | ||
try: | ||
model_id = int(model_id) | ||
except ValueError: | ||
response = {"message": f"{cls.__name__} {model_id} invalid"} | ||
abort(make_response(response, 400)) | ||
|
||
#execute the query statement and retrieve the models | ||
query = db.select(cls).where(cls.id == model_id) #select records with an id = model_id | ||
model = db.session.scalar(query) #retrieve only one record model_id | ||
|
||
if not model: | ||
response = {"message": f"{cls.__name__} {model_id} not found"} | ||
abort(make_response(response, 404)) | ||
|
||
return model | ||
|
||
def get_models_with_filters(cls, filters=None): | ||
query = db.select(cls) | ||
|
||
if filters: | ||
for attribute, value in filters.items(): | ||
if hasattr(cls, attribute): | ||
query = query.where(getattr(cls, attribute).ilike(f"%{value}%")) | ||
|
||
models = db.session.scalars(query.order_by(cls.id)) | ||
models_response = [model.to_dict() for model in models] | ||
return models_response | ||
|
||
def send_slack_message(message): | ||
# Sends a message to the Slack channel using the Slack API. | ||
|
||
headers = { | ||
"Authorization": f"Bearer {SLACK_TOKEN}", | ||
"Content-Type": "application/json" | ||
} | ||
payload = { | ||
"channel": "tasks-api-notifications", | ||
"text": message | ||
} | ||
response = requests.post(SLACK_URL, headers=headers, json=payload) | ||
response.raise_for_status() # Raise an error if the request fails | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ⭐️ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,104 @@ | ||
from flask import Blueprint | ||
from flask import Blueprint, request, abort, make_response, Response | ||
from app.models.task import Task | ||
from datetime import datetime | ||
from app.routes.route_utilities import validate_model | ||
from ..db import db | ||
from app.routes.route_utilities import send_slack_message | ||
|
||
bp = Blueprint("task_bp", __name__,url_prefix="/tasks") | ||
|
||
@bp.post("") | ||
def create_task(): | ||
request_body = request.get_json() | ||
|
||
try: | ||
new_task = Task.from_dict(request_body) | ||
except: | ||
response = {"details": "Invalid data"} | ||
abort(make_response(response, 400)) | ||
|
||
db.session.add(new_task) | ||
db.session.commit() | ||
|
||
return new_task.to_dict(), 201 | ||
|
||
@bp.get("") | ||
def get_all_tasks(): | ||
|
||
query = db.select(Task) #select records from db Model | ||
|
||
# Check for sorting parameter and apply | ||
sorting_param = request.args.get("sort", "asc") | ||
|
||
if sorting_param == "desc": | ||
query = query.order_by(Task.title.desc()) | ||
else: | ||
query = query.order_by(Task.title) | ||
|
||
#query = query.order_by(Task.id)#select records | ||
tasks = db.session.scalars(query) #retrieve the records | ||
|
||
response =[] | ||
for task in tasks: | ||
response.append(task.to_dict(include_name=False)) | ||
Comment on lines
+28
to
+43
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could this sorting logic be placed inside of our |
||
return response | ||
|
||
@bp.get("/<task_id>") | ||
def get_one_task_by_id(task_id): | ||
|
||
task = validate_model(Task,task_id) | ||
|
||
if task.goal_id: | ||
return task.to_dict(goal_id=True) | ||
else: | ||
return task.to_dict() | ||
|
||
@bp.put("/<task_id>") | ||
def update_one_task_by_id(task_id): | ||
|
||
task = validate_model(Task,task_id) #record with id = task_id | ||
request_body = request.get_json() | ||
|
||
#Update the task | ||
task.title = request_body["title"] | ||
task.description = request_body["description"] | ||
|
||
db.session.commit() #save the changes to db | ||
|
||
return task.to_dict() | ||
|
||
@bp.patch("/<task_id>/<task_status>") | ||
def task_status(task_id, task_status): | ||
|
||
task = validate_model(Task,task_id) #record with id = task_id | ||
|
||
# Update task status based on the task_status value | ||
if task_status == "mark_complete": | ||
task.completed_at = datetime.now() | ||
|
||
|
||
# Send Slack notification | ||
send_slack_message(f"Someone just completed the task '{task.title}'") | ||
|
||
elif task_status == "mark_incomplete": | ||
task.completed_at = None # Set to None to indicate incomplete | ||
else: | ||
# Return error response for invalid task_status | ||
return {"error": "Invalid task status provided"}, 400 | ||
|
||
db.session.commit() #save the changes to db | ||
return task.to_dict() | ||
Comment on lines
+70
to
+90
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice work here! ⭐️ |
||
|
||
|
||
@bp.delete("/<task_id>") | ||
def delete_task_by_id(task_id): | ||
task = validate_model(Task,task_id) | ||
|
||
#Delete the task | ||
db.session.delete(task) | ||
db.session.commit() | ||
|
||
response = { | ||
"details": f"Task {task.id} \"{task.title}\" successfully deleted" | ||
} | ||
return response, 200 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Single-database configuration for Flask. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You would want to make sure to delete a comment like this so you don't expose your database connection string.