diff --git a/conf/lambda.json b/conf/lambda.json index 844802aea..c8953a909 100644 --- a/conf/lambda.json +++ b/conf/lambda.json @@ -54,7 +54,7 @@ "concurrency_limit": 10, "memory": 128, "timeout": 300, - "file_format": null, + "file_format": "parquet", "log_level": "info" }, "classifier_config": {}, diff --git a/conf/schemas/cloudtrail.json b/conf/schemas/cloudtrail.json index b6a99be2b..b23ac9651 100644 --- a/conf/schemas/cloudtrail.json +++ b/conf/schemas/cloudtrail.json @@ -22,16 +22,19 @@ "cloudtrail:events": { "schema": { "additionalEventData": {}, + "addendum": {}, "apiVersion": "string", "awsRegion": "string", "errorCode": "string", "errorMessage": "string", + "eventCategory": "string", "eventID": "string", "eventName": "string", "eventSource": "string", "eventTime": "string", "eventType": "string", "eventVersion": "string", + "insightDetails": {}, "managementEvent": "boolean", "readOnly": "boolean", "recipientAccountId": "string", @@ -39,9 +42,11 @@ "requestParameters": {}, "resources": [], "responseElements": {}, + "sessionCredentialFromConsole": "boolean", "serviceEventDetails": {}, "sharedEventID": "string", "sourceIPAddress": "string", + "tlsDetails": {}, "userAgent": "string", "userIdentity": {}, "vpcEndpointId": "string" @@ -50,17 +55,22 @@ "configuration": { "json_path": "Records[*]", "optional_top_level_keys": [ + "addendum", "additionalEventData", "apiVersion", "errorCode", "errorMessage", + "eventCategory", + "insightDetails", "managementEvent", "requestID", "readOnly", "resources", "serviceEventDetails", + "sessionCredentialFromConsole", "sharedEventID", "sourceIPAddress", + "tlsDetails", "userAgent", "vpcEndpointId" ] diff --git a/docs/source/getting-started.rst b/docs/source/getting-started.rst index 1a6d25c86..3398af153 100644 --- a/docs/source/getting-started.rst +++ b/docs/source/getting-started.rst @@ -103,10 +103,6 @@ Deploy python manage.py configure aws_account_id 111111111111 # Replace with your 12-digit AWS account ID python manage.py configure prefix # Choose a unique name prefix (alphanumeric characters only) -.. note:: - - * Update the ``file_format`` value in ``conf/lambda.json``. Valid options are ``parquet`` or ``json``. The default value will be parquet in a future release, but this must be manually configured at this time. - .. code-block:: bash "athena_partitioner_config": { diff --git a/docs/source/outputs.rst b/docs/source/outputs.rst index 10033e06c..4b59013b2 100644 --- a/docs/source/outputs.rst +++ b/docs/source/outputs.rst @@ -45,7 +45,7 @@ Adding a new configuration for a currently supported service is handled using `` ```` above should be one of the following supported service identifiers. ``aws-cloudwatch-log``, ``aws-firehose``, ``aws-lambda``, ``aws-lambda-v2``, ``aws-s3``, - ``aws-sns``, ``aws-sqs``, ``carbonblack``, ``github``, ``jira``, ``komand``, ``pagerduty``, + ``aws-sns``, ``aws-sqs``, ``carbonblack``, ``github``, ``jira``, ``jira-v2``, ``komand``, ``pagerduty``, ``pagerduty-incident``, ``pagerduty-v2``, ``phantom``, ``slack`` For example: diff --git a/requirements.txt b/requirements.txt index 33fc294c8..e4d70868b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -39,10 +39,10 @@ boto==2.49.0 botocore==1.17.29 cachetools==4.1.1 certifi==2020.6.20 -cffi==1.14.1 +cffi==1.14.4 cfn-lint==0.34.0 chardet==3.0.4 -cryptography==3.0 +cryptography==3.2 decorator==4.4.2 docker==4.2.2 docopt==0.6.2 @@ -55,7 +55,7 @@ google-api-core==1.22.0 google-auth==1.19.2 google-auth-httplib2==0.0.4 googleapis-common-protos==1.52.0 -httplib2==0.18.1 +httplib2==0.19.0 idna==2.8 imagesize==1.2.0 importlib-metadata==1.7.0 diff --git a/rules/community/cloudwatch_events/cloudtrail_critical_api_calls.py b/rules/community/cloudwatch_events/cloudtrail_critical_api_calls.py index a3c77f0c7..fb975de4e 100644 --- a/rules/community/cloudwatch_events/cloudtrail_critical_api_calls.py +++ b/rules/community/cloudwatch_events/cloudtrail_critical_api_calls.py @@ -11,6 +11,7 @@ 'DeleteCluster', # CloudTrail 'DeleteTrail', + 'PutEventSelectors', 'UpdateTrail', 'StopLogging', # AWS Config diff --git a/streamalert/__init__.py b/streamalert/__init__.py index 461eff87c..ad9304418 100644 --- a/streamalert/__init__.py +++ b/streamalert/__init__.py @@ -1,2 +1,2 @@ """StreamAlert version.""" -__version__ = '3.4.0' +__version__ = '3.4.1' diff --git a/streamalert/alert_processor/outputs/jira.py b/streamalert/alert_processor/outputs/jira.py index b10362f2d..406ebc34e 100644 --- a/streamalert/alert_processor/outputs/jira.py +++ b/streamalert/alert_processor/outputs/jira.py @@ -44,6 +44,7 @@ class JiraOutput(OutputDispatcher): def __init__(self, *args, **kwargs): OutputDispatcher.__init__(self, *args, **kwargs) self._base_url = None + self._verify_ssl = False self._auth_cookie = None @classmethod @@ -76,9 +77,11 @@ def get_user_defined_properties(cls): OutputProperty(description='the Jira password', mask_input=True, cred_requirement=True)), + # Example: "https://jira.mywebsite.com" ('url', - OutputProperty(description='the Jira url', + OutputProperty(description='the Jira REST API base url', mask_input=True, + input_restrictions={' '}, # include this or ":" will be invalid cred_requirement=True)), ('project_key', OutputProperty(description='the Jira project key', @@ -92,6 +95,41 @@ def get_user_defined_properties(cls): OutputProperty(description='the Jira aggregation behavior to aggregate ' 'alerts by rule name (yes/no)', mask_input=False, + cred_requirement=True)), + # When aggregation is enabled, it will fuzzy-search any JIRA ticket that best-matches + # the "summary ~ ..." statement, within the project key. For each matching rule, + # instead of creating new JIRA tasks over and over, it will instead opt to append a + # comment to a similar(ish) JIRA task. + # + # However, this can result in very long-lived JIRA tickets getting tons of comments + # appended on. This optional parameter allows users to specify an additional JQL clause + # to filter out these older tickets, encouraging new JIRA tasks to be created from + # time to time. It can also be used to increase the accuracy of finding the parent + # task (maybe filtering on a component) in case you find the StreamAlert integration + # is appending comments to unrelated issues. + # + # Example: A highly effective JQL suffix is "created > startOfWeek(-1w)" + ('aggregation_additional_jql', + OutputProperty(description='when aggregation is enabled, provide additional JQL ' + 'clause to filter out older/outdated issues', + mask_input=False, + input_restrictions={}, + cred_requirement=True)), + ('ssl_verify', + OutputProperty(description='do clientside ssl cert verification (yes/no)', + mask_input=False, + cred_requirement=True)), + # For example, if your JIRA project requires a custom field called "custom_field_1", + # you can set the following json-encoded string in this: + # {"custom_field_1": {"value": "FooBar"}} + # + # These fields are DEFAULT values. You can still override them using the + # @jira.additional_fields publisher parameter. + ('additional_required_issue_fields', + OutputProperty(description='when a JIRA project has additional required fields, ' + 'provide them here, as a json-encoded string', + mask_input=False, + input_restrictions={}, cred_requirement=True)) ]) @@ -127,7 +165,7 @@ def _search_jira(self, jql, fields=None, max_results=100, validate_query=True): resp = self._get_request_retry(search_url, params=params, headers=self._get_headers(), - verify=False) + verify=self._verify_ssl) except OutputRequestFailure: return [] @@ -152,7 +190,7 @@ def _create_comment(self, issue_id, comment): resp = self._post_request_retry(comment_url, data={'body': comment}, headers=self._get_headers(), - verify=False) + verify=self._verify_ssl) except OutputRequestFailure: return False @@ -175,7 +213,7 @@ def _get_comments(self, issue_id): try: resp = self._get_request_retry(comment_url, headers=self._get_headers(), - verify=False) + verify=self._verify_ssl) except OutputRequestFailure: return [] @@ -185,17 +223,24 @@ def _get_comments(self, issue_id): return response.get('comments', []) - def _get_existing_issue(self, issue_summary, project_key): + def _get_existing_issue(self, issue_summary, project_key, additional_jql): """Find an existing Jira issue based on the issue summary Args: issue_summary (str): The Jira issue summary project_key (str): The Jira project to search + additional_jql (str): Additional JQL statement to filter by Returns: int: ID of the found issue or False if existing issue does not exist """ - jql = 'summary ~ "{}" and project="{}"'.format(issue_summary, project_key) + jql = 'summary ~ "{}" and project="{}"{}'.format( + issue_summary, + project_key, + " AND {}".format(additional_jql) if additional_jql else "" + ) + + LOGGER.debug('Aggregation using JQL: (%s)', jql) resp = self._search_jira(jql, fields=['id', 'summary'], max_results=1) jira_id = False @@ -206,7 +251,7 @@ def _get_existing_issue(self, issue_summary, project_key): return jira_id - def _create_issue(self, summary, project_key, issue_type, description): + def _create_issue(self, summary, project_key, issue_type, description, additional_fields): """Create a Jira issue to write alerts to. Alert is written to the "description" field of an issue. @@ -215,6 +260,11 @@ def _create_issue(self, summary, project_key, issue_type, description): project_key (str): The Jira project key which issues will be associated with issue_type (str): The type of issue being created description (str): The body of text which describes the issue + additional_fields (dict): + Additional fields to set with the integration. This can vary greatly from + project to project, so be wary of which fields are available. You can use the + /issue/createmeta?projectKeys=CSIRT endpoint to discover which fields are available + (and which ones are required) for your specific project. Returns: int: ID of the created issue or False if unsuccessful @@ -229,14 +279,15 @@ def _create_issue(self, summary, project_key, issue_type, description): 'description': description, 'issuetype': { 'name': issue_type - } + }, + **additional_fields } } try: resp = self._post_request_retry(issue_url, data=issue_body, headers=self._get_headers(), - verify=False) + verify=self._verify_ssl) except OutputRequestFailure: return False @@ -264,7 +315,7 @@ def _establish_session(self, username, password): resp = self._post_request_retry(login_url, data=auth_info, headers=self._get_default_headers(), - verify=False) + verify=self._verify_ssl) except OutputRequestFailure: LOGGER.error("Failed to authenticate to Jira") return False @@ -291,6 +342,18 @@ def _dispatch(self, alert, descriptor): so it supports their custom markdown-like formatting and respects newline characters (e.g. \n). + - @jira.additional_fields (dict): + A structure of additional fields to add to Create Issue API calls. For example, + if you have a custom field for severity, you could specify it in this dict + like so: + + { + "custom_field_1122": { + "value": "Low" + } + } + + Args: alert (Alert): Alert instance which triggered a rule descriptor (str): Output descriptor @@ -307,7 +370,11 @@ def _dispatch(self, alert, descriptor): # Presentation defaults default_issue_summary = 'StreamAlert {}'.format(alert.rule_name) default_alert_body = '{{code:JSON}}{}{{code}}'.format( - json.dumps(publication, sort_keys=True) + json.dumps( + publication, + indent=2, + sort_keys=True, + ) ) # True Presentation values @@ -318,6 +385,7 @@ def _dispatch(self, alert, descriptor): comment_id = None self._base_url = creds['url'] + self._verify_ssl = creds.get('verify_ssl', '').lower() == 'yes' self._auth_cookie = self._establish_session(creds['username'], creds['password']) # Validate successful authentication @@ -327,7 +395,11 @@ def _dispatch(self, alert, descriptor): # If aggregation is enabled, attempt to add alert to an existing issue. If a # failure occurs in this block, creation of a new Jira issue will be attempted. if creds.get('aggregate', '').lower() == 'yes': - issue_id = self._get_existing_issue(issue_summary, creds['project_key']) + issue_id = self._get_existing_issue( + issue_summary, + creds['project_key'], + creds.get('aggregation_additional_jql', '') + ) if issue_id: comment_id = self._create_comment(issue_id, description) if comment_id: @@ -340,10 +412,22 @@ def _dispatch(self, alert, descriptor): issue_id) # Create a new Jira issue + required_fields_json = creds.get('additional_required_issue_fields') + additional_required_fields = ( + json.loads(required_fields_json) + if required_fields_json + else {} + ) + + additional_fields = { + **additional_required_fields, + **publication.get('@jira.additional_fields', {}), + } issue_id = self._create_issue(issue_summary, creds['project_key'], creds['issue_type'], - description) + description, + additional_fields) if issue_id: LOGGER.debug('Sending alert to a new Jira issue %s', issue_id) diff --git a/streamalert/alert_processor/outputs/jira_v2.py b/streamalert/alert_processor/outputs/jira_v2.py new file mode 100644 index 000000000..a7949be59 --- /dev/null +++ b/streamalert/alert_processor/outputs/jira_v2.py @@ -0,0 +1,341 @@ +""" +Copyright 2017-present, Airbnb Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from collections import OrderedDict +import os +import json +import base64 + +from streamalert.alert_processor.helpers import compose_alert +from streamalert.alert_processor.outputs.output_base import ( + OutputDispatcher, + OutputProperty, + OutputRequestFailure, + StreamAlertOutput +) +from streamalert.shared.logger import get_logger + + +LOGGER = get_logger(__name__) + + +@StreamAlertOutput +class JiraOutput(OutputDispatcher): + """JiraOutput handles all alert dispatching for Jira""" + __service__ = 'jira-v2' + + DEFAULT_HEADERS = {"Accept": "application/json"} + SEARCH_ENDPOINT = '/rest/api/2/search' + ISSUE_ENDPOINT = '/rest/api/2/issue' + COMMENT_ENDPOINT = '/rest/api/2/issue/{}/comment' + + def __init__(self, *args, **kwargs): + OutputDispatcher.__init__(self, *args, **kwargs) + self._base_url = None + self._api_key = None + self._user_name = None + + @classmethod + def get_user_defined_properties(cls): + """Get properties that must be assigned by the user when configuring a new Jira + output. This should be sensitive or unique information for this use-case that needs + to come from the user. + + Every output should return a dict that contains a 'descriptor' with a description of the + integration being configured. + + Jira requires a api key, URL, project key, and issue type for alert dispatching. + These values should be masked during input and are credential requirements. + + An additional parameter 'aggregate' is used to determine if alerts are aggregated into a + single Jira issue based on the StreamAlert rule. + + Returns: + OrderedDict: Contains various OutputProperty items + """ + return OrderedDict([ + ('descriptor', + OutputProperty(description='a short and unique descriptor for this ' + 'Jira integration')), + ('api_key', + OutputProperty( + description='the Jira api key' + 'generated at https://id.atlassian.com/manage/api-tokens', + mask_input=True, + cred_requirement=True)), + ('user_name', + OutputProperty( + description='The jira username for the api key' + 'example "username@company.com"', + mask_input=False, + cred_requirement=True)), + ('url', + OutputProperty( + description='Base URL of your Jira instance' + 'example https://company.atlassian.net', + mask_input=False, + input_restrictions={}, + cred_requirement=True)), + ('project_key', + OutputProperty(description='Jira project KEY where issues should be created', + mask_input=False, + cred_requirement=True)), + ('issue_type', + OutputProperty(description='the Jira issue type please note this is case sensitive' + 'example ("Task" not "task)"', + mask_input=False, + cred_requirement=True)), + ('aggregate', + OutputProperty(description='the Jira aggregation behavior to aggregate ' + 'alerts by rule name (yes/no)', + mask_input=False, + cred_requirement=True)) + ]) + + @classmethod + def _get_default_headers(cls): + """Class method to be used to pass the default headers""" + return cls.DEFAULT_HEADERS.copy() + + def _get_headers(self): + """Instance method used to pass the default headers plus the api key""" + auth_token = "%s:%s" % (self._user_name, self._api_key) + encoded_credentials = base64.b64encode(auth_token.encode()) + return dict(self._get_default_headers(), + **{"Authorization": "Basic %s" % encoded_credentials.decode()}) + + def _load_creds(self, descriptor): + """Loads a dict of credentials relevant to this output descriptor + + Args: + descriptor (str): unique identifier used to look up these credentials + + Returns: + dict: the loaded credential info needed for sending alerts to this service + or None if nothing gets loaded + """ + return self._credentials_provider.load_credentials(descriptor) + + def _search_jira(self, jql, fields=None, max_results=100, validate_query=True): + """Search Jira for issues using a JQL query + + Args: + jql (str): The JQL query + fields (list): List of fields to return for each issue + max_results (int): Maximum number of results to return + validate_query (bool): Whether to validate the JQL query or not + + Returns: + list: list of issues matching JQL query + """ + search_url = os.path.join(self._base_url, self.SEARCH_ENDPOINT) + params = { + 'jql': jql, + 'maxResults': max_results, + 'validateQuery': validate_query, + 'fields': fields + } + try: + resp = self._get_request_retry(search_url, + params=params, + headers=self._get_headers(), + verify=False) + except OutputRequestFailure: + return [] + + response = resp.json() + if not response: + return [] + + return response.get('issues', []) + + def _create_comment(self, issue_id, comment): + """Add a comment to an existing issue + + Args: + issue_id (str): The existing issue ID or key + comment (str): The body of the comment + + Returns: + int: ID of the created comment or False if unsuccessful + """ + comment_url = os.path.join(self._base_url, self.COMMENT_ENDPOINT.format(issue_id)) + try: + resp = self._post_request_retry(comment_url, + data={'body': comment}, + headers=self._get_headers(), + verify=False) + except OutputRequestFailure: + return False + + response = resp.json() + if not response: + return False + + return response.get('id', False) + + def _get_comments(self, issue_id): + """Get all comments for an existing Jira issue + + Args: + issue_id (str): The existing issue ID or key + + Returns: + list: List of comments associated with a Jira issue + """ + comment_url = os.path.join(self._base_url, self.COMMENT_ENDPOINT.format(issue_id)) + try: + resp = self._get_request_retry(comment_url, + headers=self._get_headers(), + verify=False) + except OutputRequestFailure: + return [] + + response = resp.json() + if not response: + return [] + + return response.get('comments', []) + + def _get_existing_issue(self, issue_summary, project_key): + """Find an existing Jira issue based on the issue summary + + Args: + issue_summary (str): The Jira issue summary + project_key (str): The Jira project to search + + Returns: + int: ID of the found issue or False if existing issue does not exist + """ + jql = 'summary ~ "{}" and project="{}"'.format(issue_summary, project_key) + resp = self._search_jira(jql, fields=['id', 'summary'], max_results=1) + jira_id = False + + try: + jira_id = int(resp[0]['id']) + except (IndexError, KeyError): + LOGGER.debug('Existing Jira issue not found') + + return jira_id + + def _create_issue(self, summary, project_key, issue_type, description): + """Create a Jira issue to write alerts to. Alert is written to the "description" + field of an issue. + + Args: + summary (str): The name of the Jira issue + project_key (str): The Jira project key which issues will be associated with + issue_type (str): The type of issue being created + description (str): The body of text which describes the issue + + Returns: + int: ID of the created issue or False if unsuccessful + """ + issue_url = os.path.join(self._base_url, self.ISSUE_ENDPOINT) + issue_body = { + 'fields': { + 'project': { + 'key': project_key + }, + 'summary': summary, + 'description': description, + 'issuetype': { + 'name': issue_type + } + } + } + try: + resp = self._post_request_retry(issue_url, + data=issue_body, + headers=self._get_headers(), + verify=False) + except OutputRequestFailure: + return False + + response = resp.json() + if not response: + return False + + return response.get('id', False) + + def _dispatch(self, alert, descriptor): + """Send alert to Jira + + Publishing: + This output uses a default issue summary and sends the entire publication into the + issue body as a {{code}} block. To override: + + - @jira.issue_summary (str): + Overrides the issue title that shows up at the top on the JIRA UI + + - @jira.description (str): + Send your own custom description. Remember: This field is in JIRA's syntax, + so it supports their custom markdown-like formatting and respects newline + characters (e.g. \n). + + Args: + alert (Alert): Alert instance which triggered a rule + descriptor (str): Output descriptor + + Returns: + bool: True if alert was sent successfully, False otherwise + """ + creds = self._load_creds(descriptor) + if not creds: + return False + + publication = compose_alert(alert, self, descriptor) + + # Presentation defaults + default_issue_summary = 'StreamAlert {}'.format(alert.rule_name) + default_alert_body = '{{code:JSON}}{}{{code}}'.format( + json.dumps(publication, sort_keys=True) + ) + + # True Presentation values + issue_summary = publication.get('@jira.issue_summary', default_issue_summary) + description = publication.get('@jira.description', default_alert_body) + + issue_id = None + comment_id = None + + self._base_url = creds['url'] + self._api_key = creds['api_key'] + self._user_name = creds['user_name'] + + # If aggregation is enabled, attempt to add alert to an existing issue. If a + # failure occurs in this block, creation of a new Jira issue will be attempted. + if creds.get('aggregate', '').lower() == 'yes': + issue_id = self._get_existing_issue(issue_summary, creds['project_key']) + if issue_id: + comment_id = self._create_comment(issue_id, description) + if comment_id: + LOGGER.debug('Sending alert to an existing Jira issue %s with comment %s', + issue_id, + comment_id) + return True + LOGGER.error('Encountered an error when adding alert to existing ' + 'Jira issue %s. Attempting to create new Jira issue.', + issue_id) + + # Create a new Jira issue + issue_id = self._create_issue(issue_summary, + creds['project_key'], + creds['issue_type'], + description) + if issue_id: + LOGGER.debug('Sending alert to a new Jira issue %s', issue_id) + + return bool(issue_id or comment_id) diff --git a/streamalert/alert_processor/outputs/victorops.py b/streamalert/alert_processor/outputs/victorops.py new file mode 100644 index 000000000..cdcd26fdd --- /dev/null +++ b/streamalert/alert_processor/outputs/victorops.py @@ -0,0 +1,115 @@ +""" +Copyright 2017-present Airbnb, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from collections import OrderedDict + +from streamalert.alert_processor.helpers import compose_alert +from streamalert.alert_processor.outputs.output_base import ( + OutputDispatcher, + OutputProperty, + StreamAlertOutput +) +from streamalert.shared.logger import get_logger + +LOGGER = get_logger(__name__) + + +@StreamAlertOutput +class VictorOpsOutput(OutputDispatcher): + """VictorOpsOutput handles all alert dispatching for VictorOps""" + __service__ = 'victorops' + _DEFAULT_REQUEST_TIMEOUT = 10 + + # Change the default request timeout for just this output + _DEFAULT_REQUEST_TIMEOUT = 10 + + @classmethod + def get_user_defined_properties(cls): + """Get properties that must be assigned by the user when configuring a new VictorOps + output. This should be sensitive or unique information for this use-case that needs + to come from the user. + + Every output should return a dict that contains a 'descriptor' with a description of the + integration being configured. + + Returns: + OrderedDict: Contains various OutputProperty items + """ + return OrderedDict([ + ('descriptor', + OutputProperty(description='A short and unique descriptor for this ' + 'VictorOps integration')), + ('victorops_api_id', + OutputProperty(description='The API Id for this VictorOps integration.', + mask_input=True, + cred_requirement=True)), + ('victorops_api_key', + OutputProperty(description='The API Key for this VictorOps integration.', + mask_input=True, + cred_requirement=True)), + ('url', + OutputProperty(description='The endpoint url for this VictorOps integration.', + mask_input=True, + cred_requirement=True)), + ('routing_key', + OutputProperty(description='The endpoint routing key for this VictorOps integration.', + mask_input=True, + cred_requirement=True)) + ]) + + def _dispatch(self, alert, descriptor): + """Send alert to VictorOps + + Publishing: + By default this output sends the current publication to VictorOps. + There is no "magic" field to "override" it: Simply publish what you want to send! + + Args: + alert (Alert): Alert instance which triggered a rule + descriptor (str): Output descriptor + + Returns: + bool: True if alert was sent successfully, False otherwise + """ + creds = self._load_creds(descriptor) + if not creds: + return False + + publication = compose_alert(alert, self, descriptor) + + + headers = { + 'Content-Type': 'application/json', + 'X-VO-Api-Id': creds['victorops_api_id'], + 'X-VO-Api-Key': creds['victorops_api_key'] + } + + data = { + "message_type": "CRITICAL", + "entity_id": "streamalert/alert", + "entity_display_name": alert.rule_name, + "record": publication['record'] + } + + LOGGER.critical('Sending alert to VictorOps') + url = creds['url'] + '/' + creds['routing_key'] + resp = self._post_request( + url, + data, + headers, + True + ) + + return self._check_http_response(resp) diff --git a/streamalert_cli/config.py b/streamalert_cli/config.py index 12d732489..b40cdf5cb 100644 --- a/streamalert_cli/config.py +++ b/streamalert_cli/config.py @@ -23,11 +23,12 @@ from streamalert.apps import StreamAlertApp from streamalert.shared import CLUSTERED_FUNCTIONS, config, metrics from streamalert.shared.logger import get_logger -from streamalert_cli.helpers import continue_prompt +from streamalert_cli.terraform import TERRAFORM_FILES_PATH from streamalert_cli.apps.helpers import save_app_auth_info +from streamalert_cli.helpers import continue_prompt -DEFAULT_CONFIG_PATH = 'conf' +DEFAULT_CONFIG_PATH = 'conf' LOGGER = get_logger(__name__) @@ -69,8 +70,21 @@ def terraform_files(self): self.config['global']['general'].get('terraform_files', []) ) - @staticmethod - def _setup_build_directory(directory): + def _copy_terraform_files(self, directory): + """Copy all packaged terraform files and terraform files provided by the user to temp + + Args: + config (CLIConfig): Loaded StreamAlert config + """ + shutil.copytree(TERRAFORM_FILES_PATH, directory) + + # Copy any additional user provided terraform files to temp + for item in self.terraform_files: + shutil.copy2(item, directory) + + LOGGER.info('Copied Terraform configuration to \'%s\'', directory) + + def _setup_build_directory(self, directory): """Create the directory to be used for building infrastructure Args: @@ -87,7 +101,9 @@ def _setup_build_directory(directory): temp_dir.cleanup() if os.path.exists(directory): - shutil.rmtree(directory) + shutil.rmtree(directory) # shutil.copytree in python3.7 cannot handle existing dir + + self._copy_terraform_files(directory) return directory diff --git a/streamalert_cli/manage_lambda/package.py b/streamalert_cli/manage_lambda/package.py index fcc32fd3f..560b8dca3 100644 --- a/streamalert_cli/manage_lambda/package.py +++ b/streamalert_cli/manage_lambda/package.py @@ -47,6 +47,7 @@ class LambdaPackage: 'netaddr==0.8.0', 'requests==2.24.0', 'pymsteams==0.1.13', + 'idna==2.8', } def __init__(self, config): diff --git a/streamalert_cli/terraform/generate.py b/streamalert_cli/terraform/generate.py index 9923edb2b..7d929fe5c 100644 --- a/streamalert_cli/terraform/generate.py +++ b/streamalert_cli/terraform/generate.py @@ -15,14 +15,12 @@ """ import json import os -import shutil from streamalert.shared.config import ConfigError, firehose_alerts_bucket from streamalert.shared.logger import get_logger from streamalert.shared.utils import get_database_name, get_data_file_format from streamalert_cli.athena.helpers import generate_alerts_table_schema from streamalert_cli.helpers import check_credentials -from streamalert_cli.terraform import TERRAFORM_FILES_PATH from streamalert_cli.terraform.common import ( InvalidClusterName, infinitedict, @@ -384,28 +382,6 @@ def handler(cls, options, config): return terraform_generate_handler(config, check_creds=False) -def _copy_terraform_files(config): - """Copy all packaged terraform files and terraform files provided by the user to temp - - Args: - config (CLIConfig): Loaded StreamAlert config - """ - # Copy the packaged terraform files to temp - # Currently this ignores *.tf.json, in the instance that these - # exist in current deployments. This can be removed in a future release. - shutil.copytree( - TERRAFORM_FILES_PATH, - config.build_directory, - ignore=shutil.ignore_patterns('*.tf.json') # TODO: remove this eventually - ) - - # Copy any additional user provided terraform files to temp - for item in config.terraform_files: - shutil.copy2(item, config.build_directory) - - LOGGER.info('Copied Terraform configuration to \'%s\'', config.build_directory) - - def terraform_generate_handler(config, init=False, check_tf=True, check_creds=True): """Generate all Terraform plans for the configured clusters. @@ -424,8 +400,6 @@ def terraform_generate_handler(config, init=False, check_tf=True, check_creds=Tr if check_tf and not terraform_check(): return False - _copy_terraform_files(config) - # Setup the main.tf.json file LOGGER.debug('Generating cluster file: main.tf.json') _create_terraform_module_file( @@ -634,23 +608,6 @@ def generate_global_lambda_settings( tf_tmp_file (str): filename of terraform file, generated by CLI. message (str): Message will be logged by LOGGER. """ - if conf_name == 'athena_partitioner_config': - # Raise ConfigError when user doesn't explicitly set `file_format` - # in `athena_partitioner_config` in conf/lambda.json when upgrade to v3.1.0. - file_format = get_data_file_format(config) - - if not file_format or file_format not in ('parquet', 'json'): - message = ( - '[WARNING] ' - 'It is required to explicitly set "file_format" for ' - 'athena_partitioner_config in "conf/lambda.json" when upgrading to v3.1.0. ' - 'Available values are "parquet" and "json". For more information, refer to ' - 'https://github.com/airbnb/streamalert/issues/1143. ' - 'In the future release, the default value of "file_format" will ' - 'be changed to "parquet".' - ) - raise ConfigError(message) - tf_tmp_file = os.path.join(config.build_directory, '{}.tf.json'.format(tf_tmp_file_name)) if required and conf_name not in config['lambda']: diff --git a/tests/unit/conf/outputs.json b/tests/unit/conf/outputs.json index 5e6dd3d18..2bce1ae9a 100644 --- a/tests/unit/conf/outputs.json +++ b/tests/unit/conf/outputs.json @@ -29,5 +29,8 @@ ], "slack": [ "unit_test_channel" + ], + "jira-v2": [ + "unit_test_channel" ] } \ No newline at end of file diff --git a/tests/unit/streamalert/alert_processor/outputs/test_jira_v2.py b/tests/unit/streamalert/alert_processor/outputs/test_jira_v2.py new file mode 100644 index 000000000..ec113c397 --- /dev/null +++ b/tests/unit/streamalert/alert_processor/outputs/test_jira_v2.py @@ -0,0 +1,232 @@ +""" +Copyright 2017-present, Airbnb Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +# pylint: disable=protected-access,attribute-defined-outside-init +from mock import patch, PropertyMock, Mock, MagicMock +from nose.tools import assert_equal, assert_false, assert_true + +from streamalert.alert_processor.outputs.jira_v2 import JiraOutput +from tests.unit.streamalert.alert_processor.helpers import get_alert + + +@patch('streamalert.alert_processor.outputs.output_base.OutputDispatcher.MAX_RETRY_ATTEMPTS', 1) +class TestJiraOutput: + """Test class for JiraOutput""" + DESCRIPTOR = 'unit_test_jira' + SERVICE = 'jira-v2' + OUTPUT = ':'.join([SERVICE, DESCRIPTOR]) + CREDS = {'api_key': 'xxxxyyyyyyyzzzzzzz', + 'user_name': 'user@company.com', + 'url': 'jira.foo.bar', + 'project_key': 'foobar', + 'issue_type': 'Task', + 'aggregate': 'yes'} + + @patch('streamalert.alert_processor.outputs.output_base.OutputCredentialsProvider') + def setup(self, provider_constructor): + """Setup before each method""" + provider = MagicMock() + provider_constructor.return_value = provider + provider.load_credentials = Mock( + side_effect=lambda x: self.CREDS if x == self.DESCRIPTOR else None + ) + self._provider = provider + self._dispatcher = JiraOutput(None) + self._dispatcher._base_url = self.CREDS['url'] + + @patch('logging.Logger.info') + @patch('requests.get') + @patch('requests.post') + def test_dispatch_issue_new(self, post_mock, get_mock, log_mock): + """JiraOutput - Dispatch Success, New Issue""" + # setup the request to not find an existing issue + get_mock.return_value.status_code = 200 + get_mock.return_value.json.return_value = {'issues': []} + post_mock.return_value.status_code = 200 + post_mock.return_value.json.side_effect = [{'id': 5000}] + + assert_true(self._dispatcher.dispatch(get_alert(), self.OUTPUT)) + + log_mock.assert_called_with('Successfully sent alert to %s:%s', + self.SERVICE, self.DESCRIPTOR) + + @patch('logging.Logger.info') + @patch('requests.get') + @patch('requests.post') + def test_dispatch_issue_existing(self, post_mock, get_mock, log_mock): + """JiraOutput - Dispatch Success, Existing Issue""" + # setup the request to find an existing issue + get_mock.return_value.status_code = 200 + existing_issues = {'issues': [{'fields': {'summary': 'Bogus'}, 'id': '5000'}]} + get_mock.return_value.json.return_value = existing_issues + post_mock.return_value.status_code = 200 + + assert_true(self._dispatcher.dispatch(get_alert(), self.OUTPUT)) + + log_mock.assert_called_with('Successfully sent alert to %s:%s', + self.SERVICE, self.DESCRIPTOR) + + @patch('logging.Logger.info') + @patch('requests.get') + @patch('requests.post') + def test_dispatch_issue_empty_comment(self, post_mock, get_mock, log_mock): + """JiraOutput - Dispatch Success, Empty Comment""" + # setup the request to find an existing issue + get_mock.return_value.status_code = 200 + existing_issues = {'issues': [{'fields': {'summary': 'Bogus'}, 'id': '5000'}]} + get_mock.return_value.json.return_value = existing_issues + type(post_mock.return_value).status_code = PropertyMock(side_effect=[200, 200, 200]) + post_mock.return_value.json.side_effect = [{}, {'id': 5000}] + + assert_true(self._dispatcher.dispatch(get_alert(), self.OUTPUT)) + + log_mock.assert_called_with('Successfully sent alert to %s:%s', + self.SERVICE, self.DESCRIPTOR) + + @patch('requests.get') + def test_get_comments_success(self, get_mock): + """JiraOutput - Get Comments, Success""" + # setup successful get comments response + get_mock.return_value.status_code = 200 + expected_result = [{}, {}] + get_mock.return_value.json.return_value = {'comments': expected_result} + + self._dispatcher._load_creds('jira') + assert_equal(self._dispatcher._get_comments('5000'), expected_result) + + @patch('requests.get') + def test_get_comments_empty_success(self, get_mock): + """JiraOutput - Get Comments, Success Empty""" + # setup successful get comments empty response + get_mock.return_value.status_code = 200 + get_mock.return_value.json.return_value = {} + + self._dispatcher._load_creds('jira_v2') + assert_equal(self._dispatcher._get_comments('5000'), []) + + @patch('requests.get') + def test_get_comments_failure(self, get_mock): + """JiraOutput - Get Comments, Failure""" + # setup successful get comments response + get_mock.return_value.status_code = 400 + + self._dispatcher._load_creds('jira') + assert_equal(self._dispatcher._get_comments('5000'), []) + + @patch('requests.get') + def test_search_failure(self, get_mock): + """JiraOutput - Search, Failure""" + # setup successful search + get_mock.return_value.status_code = 400 + + self._dispatcher._load_creds('jira_v2') + assert_equal(self._dispatcher._search_jira('foobar'), []) + + @patch('logging.Logger.error') + @patch('requests.post') + def test_auth_failure(self, post_mock, log_mock): + """JiraOutput - Auth, Failure""" + # setup unsuccesful auth response + post_mock.return_value.status_code = 400 + post_mock.return_value.content = 'content' + post_mock.return_value.json.return_value = dict() + + assert_false(self._dispatcher.dispatch(get_alert(), self.OUTPUT)) + + log_mock.assert_called_with('Failed to send alert to %s:%s', self.SERVICE, self.DESCRIPTOR) + + @patch('logging.Logger.error') + @patch('requests.post') + def test_auth_empty_response(self, post_mock, log_mock): + """JiraOutput - Auth, Failure Empty Response""" + # setup unsuccesful auth response + post_mock.return_value.status_code = 200 + post_mock.return_value.json.return_value = {} + + assert_false(self._dispatcher.dispatch(get_alert(), self.OUTPUT)) + + log_mock.assert_called_with('Failed to send alert to %s:%s', self.SERVICE, self.DESCRIPTOR) + + @patch('logging.Logger.error') + @patch('requests.get') + @patch('requests.post') + def test_issue_creation_failure(self, post_mock, get_mock, log_mock): + """JiraOutput - Issue Creation, Failure""" + # setup the successful search response - no results + get_mock.return_value.status_code = 200 + get_mock.return_value.json.return_value = {'issues': []} + post_mock.return_value.content = 'some bad content' + post_mock.return_value.json.side_effect = [dict()] + + assert_false(self._dispatcher.dispatch(get_alert(), self.OUTPUT)) + + log_mock.assert_called_with('Failed to send alert to %s:%s', self.SERVICE, self.DESCRIPTOR) + + @patch('logging.Logger.error') + @patch('requests.get') + @patch('requests.post') + def test_issue_creation_empty_search(self, post_mock, get_mock, log_mock): + """JiraOutput - Issue Creation, Failure Empty Search""" + # setup the successful search response - empty response + get_mock.return_value.status_code = 200 + get_mock.return_value.json.return_value = {} + post_mock.return_value.content = 'some bad content' + post_mock.return_value.json.side_effect = [dict()] + + assert_false(self._dispatcher.dispatch(get_alert(), self.OUTPUT)) + + log_mock.assert_called_with('Failed to send alert to %s:%s', self.SERVICE, self.DESCRIPTOR) + + @patch('logging.Logger.error') + @patch('requests.get') + @patch('requests.post') + def test_issue_creation_empty_response(self, post_mock, get_mock, log_mock): + """JiraOutput - Issue Creation, Failure Empty Response""" + # setup the successful search response - no results + get_mock.return_value.status_code = 200 + get_mock.return_value.json.return_value = {'issues': []} + # setup successful auth response and failed issue creation - empty response + type(post_mock.return_value).status_code = PropertyMock(side_effect=[200, 200]) + post_mock.return_value.json.side_effect = [{}] + + assert_false(self._dispatcher.dispatch(get_alert(), self.OUTPUT)) + + log_mock.assert_called_with('Failed to send alert to %s:%s', self.SERVICE, self.DESCRIPTOR) + + @patch('logging.Logger.error') + @patch('requests.get') + @patch('requests.post') + def test_comment_creation_failure(self, post_mock, get_mock, log_mock): + """JiraOutput - Comment Creation, Failure""" + # setup successful search response + get_mock.return_value.status_code = 200 + existing_issues = {'issues': [{'fields': {'summary': 'Bogus'}, 'id': '5000'}]} + get_mock.return_value.json.return_value = existing_issues + type(post_mock.return_value).status_code = PropertyMock(side_effect=[400, 200]) + post_mock.return_value.json.side_effect = [{'id': 6000}] + + assert_true(self._dispatcher.dispatch(get_alert(), self.OUTPUT)) + + log_mock.assert_called_with('Encountered an error when adding alert to existing Jira ' + 'issue %s. Attempting to create new Jira issue.', 5000) + + @patch('logging.Logger.error') + def test_dispatch_bad_descriptor(self, log_error_mock): + """JiraOutput - Dispatch Failure, Bad Descriptor""" + assert_false( + self._dispatcher.dispatch(get_alert(), ':'.join([self.SERVICE, 'bad_descriptor']))) + + log_error_mock.assert_called_with('Failed to send alert to %s:%s', + self.SERVICE, 'bad_descriptor') diff --git a/tests/unit/streamalert/alert_processor/outputs/test_output_base.py b/tests/unit/streamalert/alert_processor/outputs/test_output_base.py index 94554256a..357862c61 100644 --- a/tests/unit/streamalert/alert_processor/outputs/test_output_base.py +++ b/tests/unit/streamalert/alert_processor/outputs/test_output_base.py @@ -101,13 +101,15 @@ def test_output_loading(): 'demisto', 'github', 'jira', + 'jira-v2', 'komand', 'pagerduty', 'pagerduty-v2', 'pagerduty-incident', 'phantom', 'slack', - 'teams' + 'teams', + 'victorops' } assert_count_equal(loaded_outputs, expected_outputs) diff --git a/tests/unit/streamalert_cli/manage_lambda/test_package.py b/tests/unit/streamalert_cli/manage_lambda/test_package.py index d1d1bf2b3..eadd79100 100644 --- a/tests/unit/streamalert_cli/manage_lambda/test_package.py +++ b/tests/unit/streamalert_cli/manage_lambda/test_package.py @@ -16,7 +16,7 @@ # pylint: disable=no-self-use,protected-access import os -from mock import patch +from mock import patch, Mock from pyfakefs import fake_filesystem_unittest from streamalert_cli.config import CLIConfig @@ -28,6 +28,7 @@ class PackageTest(fake_filesystem_unittest.TestCase): TEST_CONFIG_PATH = 'tests/unit/conf' MOCK_TEMP_PATH = '/tmp/test_packaging' + @patch('streamalert_cli.config.CLIConfig._copy_terraform_files', Mock()) def setUp(self): self.setUpPyfakefs() self.fs.add_real_directory(self.TEST_CONFIG_PATH) diff --git a/tests/unit/streamalert_cli/terraform/test_firehose.py b/tests/unit/streamalert_cli/terraform/test_firehose.py index 4c35a9c74..8be250adb 100644 --- a/tests/unit/streamalert_cli/terraform/test_firehose.py +++ b/tests/unit/streamalert_cli/terraform/test_firehose.py @@ -218,7 +218,6 @@ def test_firehose_enabled_log_json(self): 'json:embedded': {} } - self.config = CLIConfig(config_path='tests/unit/conf_athena') firehose.generate_firehose(self._logging_bucket_name, cluster_dict, self.config) expected_result = { @@ -228,7 +227,7 @@ def test_firehose_enabled_log_json(self): 'source': './modules/tf_kinesis_firehose_delivery_stream', 'buffer_size': 128, 'buffer_interval': 900, - 'file_format': 'json', + 'file_format': 'parquet', 'stream_name': 'unit_test_streamalert_json_embedded', 'role_arn': '${module.kinesis_firehose_setup.firehose_role_arn}', 's3_bucket_name': 'unit-test-streamalert-data', diff --git a/tests/unit/streamalert_cli/terraform/test_generate.py b/tests/unit/streamalert_cli/terraform/test_generate.py index 1a3da5656..0d4bb8de8 100644 --- a/tests/unit/streamalert_cli/terraform/test_generate.py +++ b/tests/unit/streamalert_cli/terraform/test_generate.py @@ -854,32 +854,6 @@ def test_generate_main_with_sqs_url_false(self): assert_equal(result['module']['globals']['source'], './modules/tf_globals') assert_false(result['module']['globals']['sqs_use_prefix']) - def test_generate_athena_lambda_format_unspecified(self): - "CLI - Terraform Generate Global Lambda Settings, Unspecified Athena file_format" - self.config['lambda']['athena_partitioner_config']['file_format'] = None - - assert_raises( - ConfigError, - generate.generate_global_lambda_settings, - config=self.config, - conf_name='athena_partitioner_config', - generate_func='test_func', - tf_tmp_file_name='test_tf_tmp_file_path', - ) - - def test_generate_athena_lambda_format_invalid(self): - "CLI - Terraform Generate Global Lambda Settings, Invalid Athena file_format" - self.config['lambda']['athena_partitioner_config']['file_format'] = 'Parquet' - - assert_raises( - ConfigError, - generate.generate_global_lambda_settings, - config=self.config, - conf_name='athena_partitioner_config', - generate_func='test_func', - tf_tmp_file_name='test_tf_tmp_file_path', - ) - def test_generate_required_lambda_invalid_config(self): "CLI - Terraform Generate Global Lambda Settings, Invalid Config" diff --git a/tests/unit/streamalert_cli/test_cli_config.py b/tests/unit/streamalert_cli/test_cli_config.py index aa9662711..3ff7c16c9 100644 --- a/tests/unit/streamalert_cli/test_cli_config.py +++ b/tests/unit/streamalert_cli/test_cli_config.py @@ -16,7 +16,7 @@ # pylint: disable=protected-access import json -from mock import patch +from mock import patch, Mock from nose.tools import assert_equal, assert_true, assert_false from pyfakefs import fake_filesystem_unittest @@ -31,6 +31,7 @@ def __init__(self): self.config = None self.fs_patcher = None + @patch('streamalert_cli.config.CLIConfig._copy_terraform_files', Mock()) def setup(self): """Setup before each method""" config_data = basic_streamalert_config()