-
Notifications
You must be signed in to change notification settings - Fork 310
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
55ccb06
commit 6cc7435
Showing
6 changed files
with
142 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
import json | ||
import warnings | ||
from urllib.parse import urlunsplit, urlsplit | ||
|
||
import requests | ||
from requests import RequestException | ||
|
||
from elastalert.alerts import Alerter, DateTimeEncoder | ||
from elastalert.util import EAException, elastalert_logger | ||
|
||
|
||
class YzjAlerter(Alerter): | ||
""" Creates a YZJ room message for each alert """ | ||
required_options = frozenset(['yzj_token']) | ||
|
||
def __init__(self, rule): | ||
super(YzjAlerter, self).__init__(rule) | ||
self.yzj_token = self.rule.get('yzj_token', None) | ||
self.yzj_type = self.rule.get('yzj_type', 0) | ||
self.yzj_webhook_url = 'https://www.yunzhijia.com/gateway/robot/webhook/send?yzjtype=%s&yzjtoken=%s' % (self.yzj_type, self.yzj_token) | ||
self.yzj_proxy = self.rule.get('yzj_proxy', None) | ||
self.yzj_custom_loc = self.rule.get('yzj_custom_loc', None) | ||
|
||
def alert(self, matches): | ||
body = self.create_alert_body(matches) | ||
|
||
proxies = {'https': self.yzj_proxy} if self.yzj_proxy else None | ||
headers = { | ||
'Content-Type': 'application/json', | ||
'Accept': 'application/json;charset=utf-8' | ||
} | ||
|
||
if self.yzj_custom_loc is not None: | ||
part = urlsplit(self.yzj_webhook_url) | ||
custom_part = part._replace(netloc=self.yzj_custom_loc) | ||
self.yzj_webhook_url = urlunsplit(custom_part) | ||
|
||
payload = { | ||
'content': body | ||
} | ||
|
||
try: | ||
response = requests.post(self.yzj_webhook_url, data=json.dumps(payload, cls=DateTimeEncoder), | ||
headers=headers, | ||
proxies=proxies) | ||
warnings.resetwarnings() | ||
response.raise_for_status() | ||
except RequestException as e: | ||
raise EAException("Error posting to yzj: %s" % e) | ||
|
||
elastalert_logger.info("Trigger sent to YZJ") | ||
|
||
def get_info(self): | ||
return { | ||
"type": "yzj", | ||
"yzj_webhook_url": self.yzj_webhook_url | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
import json | ||
import logging | ||
|
||
from unittest import mock | ||
|
||
from elastalert.alerters.yzj import YzjAlerter | ||
from elastalert.loaders import FileRulesLoader | ||
|
||
|
||
def test_yzj_text(caplog): | ||
caplog.set_level(logging.INFO) | ||
rule = { | ||
'name': 'Test YZJ Rule', | ||
'type': 'any', | ||
'yzj_token': 'xxxxxxx', | ||
'yzj_custom_loc': 'www.myloc.cn', | ||
'alert': [], | ||
'alert_subject': 'Test YZJ' | ||
} | ||
rules_loader = FileRulesLoader({}) | ||
rules_loader.load_modules(rule) | ||
alert = YzjAlerter(rule) | ||
match = { | ||
'@timestamp': '2021-01-01T00:00:00', | ||
'somefield': 'foobarbaz' | ||
} | ||
|
||
with mock.patch('requests.post') as mock_post_request: | ||
alert.alert([match]) | ||
|
||
expected_data = { | ||
'content': 'Test YZJ Rule\n\n@timestamp: 2021-01-01T00:00:00\nsomefield: foobarbaz\n' | ||
} | ||
|
||
mock_post_request.assert_called_once_with( | ||
'https://www.myloc.cn/gateway/robot/webhook/send?yzjtype=0&yzjtoken=xxxxxxx', | ||
data=mock.ANY, | ||
headers={ | ||
'Content-Type': 'application/json', | ||
'Accept': 'application/json;charset=utf-8' | ||
}, | ||
proxies=None | ||
) | ||
|
||
actual_data = json.loads(mock_post_request.call_args_list[0][1]['data']) | ||
assert expected_data == actual_data | ||
assert ('elastalert', logging.INFO, 'Trigger sent to YZJ') == caplog.record_tuples[0] |