204 lines
5.5 KiB
Python
204 lines
5.5 KiB
Python
import datetime
|
|
import sqlite3
|
|
import logging
|
|
import os
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Self
|
|
|
|
DATABASE = "music-history.db"
|
|
|
|
@dataclass
|
|
class Album:
|
|
id: int
|
|
name: str
|
|
mbid: str | None = None
|
|
cover_art_url: str | None = None
|
|
|
|
@classmethod
|
|
def from_row(cls, row: tuple[int, str, str | None, str | None]) -> Self:
|
|
return cls(*row)
|
|
|
|
@dataclass
|
|
class Artist:
|
|
id: int
|
|
name: str
|
|
mbid: str | None = None
|
|
icon_url: str | None = None
|
|
|
|
@classmethod
|
|
def from_row(cls, row: tuple[int, str, str | None, str | None]) -> Self:
|
|
return cls(*row)
|
|
|
|
@dataclass
|
|
class Song:
|
|
id: int
|
|
name: str
|
|
length: int
|
|
album_id: int
|
|
artist_ids: list[int]
|
|
mbid: str | None = None # Music Brainz Id
|
|
|
|
@classmethod
|
|
def from_rows(cls, rows: list[tuple[int, str, int, int, int, str | None]]) -> Self:
|
|
artist_ids: list[int] = [row[4] for row in rows]
|
|
first_row = rows[0]
|
|
|
|
return cls(
|
|
first_row[0],
|
|
first_row[1],
|
|
first_row[2],
|
|
first_row[3],
|
|
artist_ids,
|
|
first_row[5]
|
|
)
|
|
|
|
def __str__(self) -> str:
|
|
return f'Title: {self.name}, Length: {self.length}, Album Id: {self.album_id}, Artist Ids: {self.artist_ids}, MBID: {self.mbid}'
|
|
|
|
|
|
class Connect:
|
|
def __init__(self, database: str):
|
|
self.con: sqlite3.Connection = sqlite3.connect(database)
|
|
|
|
def __enter__(self) -> tuple[sqlite3.Connection, sqlite3.Cursor]:
|
|
return self.con, self.con.cursor()
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
self.con.commit()
|
|
self.con.close()
|
|
|
|
# Takes a path to a .sql file and returns it's contents
|
|
def _load_query(path: str) -> str:
|
|
return open(path, 'r').read()
|
|
|
|
def execute_query(query_path: str, *args) -> list:
|
|
if not os.path.exists(query_path):
|
|
logging.error(f'Query Path "{query_path}" does not exist')
|
|
return []
|
|
|
|
query: str = _load_query(query_path)
|
|
|
|
with Connect(DATABASE) as (conn, cur):
|
|
cur.execute(query, [arg for arg in args])
|
|
|
|
return cur.fetchall()
|
|
|
|
def execute_insertion_query(query_path: str, *args) -> int | None:
|
|
if not os.path.exists(query_path):
|
|
logging.error(f'Query Path "{query_path}" does not exist')
|
|
return None
|
|
|
|
query: str = _load_query(query_path)
|
|
|
|
with Connect(DATABASE) as (conn, cur):
|
|
cur.execute(query, [arg for arg in args])
|
|
|
|
return cur.lastrowid
|
|
|
|
def insert_listen_event(song_id: int, user_id: int, date: int | datetime.datetime, time: int):
|
|
pass
|
|
|
|
def search_song_mbid(mbid: str) -> Song | None:
|
|
results: list = execute_query('sql/get_song_by_mbid.sql', mbid)
|
|
|
|
if results:
|
|
return Song.from_rows(results)
|
|
|
|
return None
|
|
|
|
def search_song_name(name: str) -> list[Song]:
|
|
name = '%' + name + '%'
|
|
|
|
results: list = execute_query('sql/get_song_by_name.sql', name)
|
|
|
|
if not results:
|
|
return []
|
|
|
|
songs: list[Song] = []
|
|
|
|
current_song_id: int | None = None
|
|
rows = []
|
|
for row in results:
|
|
if not current_song_id:
|
|
current_song_id = row[0]
|
|
rows.append(row)
|
|
|
|
elif row[0] != current_song_id:
|
|
songs.append(Song.from_rows(rows))
|
|
current_song_id = row[0]
|
|
rows = [row]
|
|
|
|
else:
|
|
rows.append(row)
|
|
|
|
return songs
|
|
|
|
def search_song_by_id(id: int) -> Song | None:
|
|
results: list = execute_query('sql/get_song_by_id.sql', id)
|
|
|
|
if results:
|
|
return Song.from_rows(results)
|
|
|
|
return None
|
|
|
|
def insert_song(id: int, name: str, length: int, album_id: int, artist_id: int, mbid: str | None = None) -> None:
|
|
execute_query('sql/insert_song.sql', id, name, length, album_id, artist_id, mbid)
|
|
|
|
def get_last_song_id() -> int:
|
|
return execute_query('sql/get_last_song_id.sql')[0][0]
|
|
|
|
def search_album_by_name(name: str) -> list[Album]:
|
|
name = '%' + name + '%'
|
|
|
|
results: list = execute_query('sql/get_album_by_name.sql', name)
|
|
|
|
if not results:
|
|
return []
|
|
|
|
albums: list[Album] = []
|
|
for row in results:
|
|
albums.append(Album.from_row(row))
|
|
|
|
return albums
|
|
|
|
def search_album_by_mbid(mbid: str) -> Album | None:
|
|
results: list = execute_query('sql/get_album_by_mbid.sql', mbid)
|
|
|
|
return Album.from_row(results[0]) if results else None
|
|
|
|
def search_album_by_id(id: int) -> Album | None:
|
|
results: list = execute_query('sql/get_album_by_id.sql', id)
|
|
|
|
return Album.from_row(results[0]) if results else None
|
|
|
|
def insert_album(name: str, mbid: str | None = None, cover_art_url: str | None = None) -> Album:
|
|
return search_album_by_id(execute_insertion_query('sql/insert_album.sql', name, mbid, cover_art_url))
|
|
|
|
def search_artist_by_name(name: str) -> list[Artist]:
|
|
name = '%' + name + '%'
|
|
|
|
results: list = execute_query('sql/get_artist_by_name.sql', name)
|
|
|
|
if not results:
|
|
return []
|
|
|
|
artists: list[Artist] = []
|
|
for row in results:
|
|
artists.append(Artist.from_row(row))
|
|
|
|
return artists
|
|
|
|
def search_artist_by_mbid(mbid: str) -> Artist | None:
|
|
results: list = execute_query('sql/get_artist_by_mbid.sql', mbid)
|
|
|
|
return Artist.from_row(results[0]) if results else None
|
|
|
|
def search_artist_by_id(id: int) -> Artist | None:
|
|
results: list = execute_query('sql/get_artist_by_id.sql', id)
|
|
|
|
return Artist.from_row(results[0]) if results else None
|
|
|
|
def insert_artist(name: str, mbid: str | None = None, icon_url: str | None = None) -> None:
|
|
return search_artist_by_id(execute_insertion_query('sql/insert_artist.sql', name, mbid, icon_url))
|