-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabase.py
executable file
·202 lines (173 loc) · 7.89 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey, LargeBinary, Text, JSON
from sqlalchemy.orm import sessionmaker, scoped_session, relationship
from sqlalchemy.ext.declarative import declarative_base
from flask import current_app
from datetime import datetime
from sqlalchemy import or_, func, cast, String
from sqlalchemy.orm import joinedload, selectinload
from sqlalchemy.exc import IntegrityError
from app.logger_config import logger
from sqlalchemy import text, UniqueConstraint
# Remove the engine creation for now
Session = scoped_session(sessionmaker())
Base = declarative_base()
class Item(Base):
__tablename__ = 'items'
id = Column(Integer, primary_key=True)
imdb_id = Column(String, unique=True, index=True)
title = Column(String, nullable=False)
year = Column(Integer)
type = Column(String)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
item_metadata = relationship("Metadata", back_populates="item", cascade="all, delete-orphan")
seasons = relationship("Season", back_populates="item", cascade="all, delete-orphan")
poster = relationship("Poster", back_populates="item", uselist=False, cascade="all, delete-orphan")
class Metadata(Base):
__tablename__ = 'metadata'
id = Column(Integer, primary_key=True)
item_id = Column(Integer, ForeignKey('items.id'), nullable=False)
key = Column(String, nullable=False)
value = Column(JSON, nullable=False) # Changed from Text to JSON
provider = Column(String)
last_updated = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
item = relationship("Item", back_populates="item_metadata")
class Season(Base):
__tablename__ = 'seasons'
__table_args__ = (UniqueConstraint('item_id', 'season_number', name='uix_item_season'),)
id = Column(Integer, primary_key=True)
item_id = Column(Integer, ForeignKey('items.id'), nullable=False)
season_number = Column(Integer, nullable=False)
episode_count = Column(Integer)
item = relationship("Item", back_populates="seasons")
episodes = relationship("Episode", back_populates="season", cascade="all, delete-orphan")
class Episode(Base):
__tablename__ = 'episodes'
id = Column(Integer, primary_key=True)
season_id = Column(Integer, ForeignKey('seasons.id'), nullable=False)
episode_number = Column(Integer, nullable=False)
episode_imdb_id = Column(String, unique=True, index=True)
title = Column(String)
overview = Column(Text)
runtime = Column(Integer)
first_aired = Column(DateTime)
season = relationship("Season", back_populates="episodes")
imdb_id = Column(String) # Add this line to include the imdb_id column
# Relationships
season = relationship('Season', back_populates='episodes')
class Poster(Base):
__tablename__ = 'posters'
id = Column(Integer, primary_key=True)
item_id = Column(Integer, ForeignKey('items.id'), nullable=False, unique=True)
image_data = Column(LargeBinary)
last_updated = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
item = relationship("Item", back_populates="poster")
class TMDBToIMDBMapping(Base):
__tablename__ = 'tmdb_to_imdb_mapping'
id = Column(Integer, primary_key=True)
tmdb_id = Column(String, unique=True, index=True)
imdb_id = Column(String, unique=True, index=True)
class DatabaseManager:
@staticmethod
def add_or_update_item(imdb_id, title, year=None, item_type=None):
with Session() as session:
item = session.query(Item).filter_by(imdb_id=imdb_id).first()
if item:
item.title = title
if year is not None:
item.year = year
if item_type is not None:
item.type = item_type
item.updated_at = datetime.utcnow()
else:
item = Item(imdb_id=imdb_id, title=title, year=year, type=item_type)
session.add(item)
session.commit()
return item.id
@staticmethod
def add_or_update_metadata(imdb_id, metadata_dict, provider):
with Session() as session:
item = session.query(Item).filter_by(imdb_id=imdb_id).first()
if not item:
item = Item(imdb_id=imdb_id, title=metadata_dict.get('title', ''))
session.add(item)
session.flush()
item_type = metadata_dict.get('type')
if item_type:
item.type = item_type
elif 'aired_episodes' in metadata_dict:
item.type = 'show'
else:
item.type = 'movie'
now = datetime.utcnow()
for key, value in metadata_dict.items():
if key != 'type':
metadata = session.query(Metadata).filter_by(item_id=item.id, key=key).first()
if metadata:
metadata.value = value
metadata.last_updated = now
else:
metadata = Metadata(item_id=item.id, key=key, value=value, provider=provider, last_updated=now)
session.add(metadata)
session.commit()
logger.debug(f"Metadata for {imdb_id} updated in battery")
@staticmethod
def get_item(imdb_id):
with Session() as session:
return session.query(Item).options(joinedload(Item.item_metadata), joinedload(Item.poster)).filter_by(imdb_id=imdb_id).first()
@staticmethod
def get_all_items():
with Session() as session:
items = session.query(Item).options(joinedload(Item.item_metadata)).all()
for item in items:
year_metadata = next((m.value for m in item.item_metadata if m.key == 'year'), None)
return items
@staticmethod
def delete_item(imdb_id):
with Session() as session:
item = session.query(Item).filter_by(imdb_id=imdb_id).first()
if item:
session.delete(item)
session.commit()
return True
return False
@staticmethod
def add_or_update_poster(item_id, image_data):
with Session() as session:
poster = session.query(Poster).filter_by(item_id=item_id).first()
if poster:
poster.image_data = image_data
poster.last_updated = datetime.utcnow()
else:
poster = Poster(item_id=item_id, image_data=image_data)
session.add(poster)
session.commit()
@staticmethod
def get_poster(imdb_id):
with Session() as session:
item = session.query(Item).filter_by(imdb_id=imdb_id).first()
if item and item.poster:
return item.poster.image_data
return None
def init_db(app):
connection_strings = [
app.config['SQLALCHEMY_DATABASE_URI'],
'postgresql://cli_debrid:cli_debrid@localhost:5433/cli_battery_database',
'sqlite:///fallback.db' # SQLite fallback as last resort
]
for connection_string in connection_strings:
try:
print(f"Attempting to connect to database: {connection_string}")
engine = create_engine(connection_string)
# Test the connection
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
Session.configure(bind=engine)
Base.metadata.create_all(engine)
logger.info(f"Successfully connected to database: {connection_string}")
logger.info("All database tables created successfully.")
return engine
except Exception as e:
logger.error(f"Failed to connect to {connection_string}: {str(e)}")
logger.critical("All database connection attempts failed.")
raise Exception("Unable to connect to any database")