-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsend_igtwitter.py
executable file
·225 lines (181 loc) · 8.34 KB
/
send_igtwitter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
#!/usr/bin/env seiscomp-python
from datetime import datetime, timedelta
import sys
import os
sys.path.append(os.path.join(os.environ['SEISCOMP_ROOT'], 'share/gds/tools/'))
###
# This import is located here due to local libraries that
# CAN NOT be appended to PYTHONPATH due to name repetition
###
from lib import bulletin, spooler
import tweepy
from ig_gds_utilities import ig_utilities as utilities
from db_igtwitter import TwitterDB
import logging
import logging.config
logging_file = os.path.join(os.environ['SEISCOMP_ROOT'], 'var/log/', 'gds_service_igtwitter.log')
logging.config.dictConfig({'version': 1, 'disable_existing_loggers': True})
logging.basicConfig(filename=logging_file, format='%(asctime)s %(message)s')
logger = logging.getLogger("igtwitter")
logger.setLevel(logging.DEBUG)
class TwitterConfig:
def __init__(self, config):
"""
Load Twitter config parameters
:param self TwitterConfig: Configuration object
:returns: configuration loader
"""
prefix = "twitter"
try:
self.accounts_file = config.get(prefix, "accounts_file")
self.hour_limit = int(config.get(prefix, "hour_limit"))
self.eqevent_path = config.get(prefix, "eqevent_path")
except Exception as e:
logger.error("##Error reading twitter config file: %s" % str(e))
prefix = "twitter_db"
try:
self.db_file = config.get(prefix, "db_file")
self.db_table_name = config.get(prefix, "db_table_name")
except Exception as e:
logger.error("##Error reading twitter db config file: %s" % str(e))
class SpoolSendTwitter(spooler.Spooler):
def __init__(self):
"""
Load ``send_igtwitter`` config and read twitter account info,
also create dbTwitter object
:param self SpoolSendTwitter:
:returns: None
"""
spooler.Spooler.__init__(self)
self.twitter_config = TwitterConfig(self._config)
logger.info("##Configuration loaded: %s" % self.twitter_config.hour_limit)
try:
self.twitter_accounts = utilities.read_config_file(self.twitter_config.accounts_file)
except Exception as e:
logger.info("Error reading twitter_accounts file: %s" % str(e))
try:
logger.info("Create db object. It will initialize the db if there is none")
self.twt_db = TwitterDB(self._config)
logger.info("## twt_db object created:%s" % self.twt_db)
except Exception as e:
logger.error("Error creating TwitterDB object: %s" % str(e))
def spool(self, addresses, content):
"""
Take an addresses list and check against the DB if the event has been published
alredy, also read the content element with a ``bulletin object`` to extract info
about the event
:param addresses: list.
:type addresses: list[str]
:param content:
:returns: True
:returns: False
:rtype: boolean
"""
logger.info("##Start spool() for SpoolSendTwitter with: %s" % (addresses))
try:
bulletin_object = bulletin.Bulletin()
bulletin_object.read(content)
except Exception as e:
raise Exception("Error starting spool(): %s" % str(e))
logger.debug("Event info to tweet: %s" % (bulletin_object.plain))
event_info = bulletin_object.plain
event_info = event_info.split(" ")
event_id = event_info[1].split(":")[1]
event_status = event_info[2]
event_datetime = datetime.strptime("%s %s" % (event_info[3], event_info[4]), "%Y-%m-%d %H:%M:%S")
# Create a function to create/check the image in case eqevents isn't ready.
event_image_path = "%s/%s/%s-map.png" % (self.twitter_config.eqevent_path, event_id, event_id)
if not os.path.isfile(event_image_path):
event_image_path = "%s/%s/%s-map.jpg" % (self.twitter_config.eqevent_path, event_id, event_id)
event_dict = {'text': '%s' % bulletin_object.plain, 'path': event_image_path}
logger.info("event info to look in db: %s %s %s" % (event_id, event_status, event_datetime))
"""Check if the event is within the hour_limit """
if not self.check_antiquity(event_datetime):
logger.info("event too old. Limit is %s hours" % self.twitter_config.hour_limit)
return True
for address in addresses:
"""Check against the DB if the event has been published already"""
select = "*"
where = "event_id='%s'" % event_id
rows = self.twt_db.get_post(select, where)
for row in rows:
if row['event_id'] == event_id and row['status'] == event_status and row['gds_target'] == address[1]:
logger.info("Event %s already published" % event_id)
return True
try:
"""Create the api to twitter"""
logger.info("Start tweet publication")
twitter_account = self.twitter_accounts[address[1]]
twitter_api = self.connect_twitter(twitter_account)
logger.info("Conection to twitter ok: %s" % twitter_api)
tweet_id = self.post_event(twitter_api, event_dict)
if tweet_id is False:
logger.error("Error posting tweet")
return False
else:
logger.info("Insert tweet_id into DB")
# event_row = {'event_id':'%s', 'tweet_id':'%s' , 'status': '%s', 'gds_target': '%s'}
event_row = {'event_id': event_id, 'tweet_id': tweet_id, 'status': event_status, 'gds_target': address[1]}
if self.twt_db.save_post(event_row) == 0:
logger.info("Post info inserted into DB: %s" % event_row)
return True
else:
logger.info("Failed to insert tweet info into DB")
return False
except Exception as e:
logger.error("Error in spool: %s" % str(e))
raise Exception("Error in spool: %s" % str(e))
def connect_twitter(self, token_dict):
"""
Takes a ``token_dict`` which is a dictionary of tokens and
is then used to authenticate to the twitter api
:param token_dict: token dictionary
:type token_dict: dict
:returns: twitter_api
"""
try:
auth = tweepy.OAuthHandler(token_dict['api_key'], token_dict['api_secret'])
auth.set_access_token(token_dict['access_token'], token_dict['secret_token'])
# redirect=auth.get_authorization_url()
twitter_api = tweepy.API(auth)
return twitter_api
except Exception as e:
logger.error("Error trying to connect twitter: %s" % str(e))
raise Exception("Error trying to connect twitter: %s" % str(e))
def post_event(self, twitter_api, event_dict):
"""
Takes a ``twitter_api`` object to use api twitter and then publish a tweet with info of
event_dict
:param twitter_api: object that contains credentials of authentication for the Api twitter
:type twitter_api: obj
:param event_dict: event dictionary
:type event_dict: dict
:returns: tweet_id.id
:rtype: int
"""
try:
logger.info("Start post tweet")
media = twitter_api.media_upload(event_dict['path'])
tweet_id = twitter_api.update_status(status=event_dict['text'], media_ids=[media.media_id])
logger.info("Posted event to twitter successfully")
return tweet_id.id
except Exception as e:
logger.error("Error trying to post to twitter : %s" % str(e))
return False
def check_antiquity(self, limit_date_time):
"""
Checks the age of an event and validates it with ``limit_date_time``.
:param limit_date_time: datetime object
:type limit_date_time: obj
:returns: True
:returns: False
:rtype: boolean
"""
date_check = datetime.now() - timedelta(hours=self.twitter_config.hour_limit)
if date_check < limit_date_time:
return True
else:
return False
if __name__ == "__main__":
app = SpoolSendTwitter()
app()