import datetime import sqlite3 import logging import os from dataclasses import dataclass from typing import Self DATABASE = "music-history.db" @dataclass class Album: id: int name: str mbid: str | None = None cover_art_url: str | None = None @classmethod def from_row(cls, row: tuple[int, str, str | None, str | None]) -> Self: return cls(*row) @dataclass class Artist: id: int name: str mbid: str | None = None icon_url: str | None = None @classmethod def from_row(cls, row: tuple[int, str, str | None, str | None]) -> Self: return cls(*row) @dataclass class Song: id: int name: str length: int album_id: int artist_ids: list[int] mbid: str | None = None # Music Brainz Id @classmethod def from_rows(cls, rows: list[tuple[int, str, int, int, int, str | None]]) -> Self: artist_ids: list[int] = [row[4] for row in rows] first_row = rows[0] return cls( first_row[0], first_row[1], first_row[2], first_row[3], artist_ids, first_row[5] ) def __str__(self) -> str: return f'Title: {self.name}, Length: {self.length}, Album Id: {self.album_id}, Artist Ids: {self.artist_ids}, MBID: {self.mbid}' class Connect: def __init__(self, database: str): self.con: sqlite3.Connection = sqlite3.connect(database) def __enter__(self) -> tuple[sqlite3.Connection, sqlite3.Cursor]: return self.con, self.con.cursor() def __exit__(self, type, value, traceback): self.con.commit() self.con.close() # Takes a path to a .sql file and returns it's contents def _load_query(path: str) -> str: return open(path, 'r').read() def execute_query(query_path: str, *args) -> list: if not os.path.exists(query_path): logging.error(f'Query Path "{query_path}" does not exist') return [] query: str = _load_query(query_path) with Connect(DATABASE) as (conn, cur): cur.execute(query, [arg for arg in args]) return cur.fetchall() def execute_insertion_query(query_path: str, *args) -> int | None: if not os.path.exists(query_path): logging.error(f'Query Path "{query_path}" does not exist') return None query: str = _load_query(query_path) with Connect(DATABASE) as (conn, cur): cur.execute(query, [arg for arg in args]) return cur.lastrowid def insert_listen_event(song_id: int, user_id: int, date: int | datetime.datetime, time: int): pass def search_song_mbid(mbid: str) -> Song | None: results: list = execute_query('sql/get_song_by_mbid.sql', mbid) if results: return Song.from_rows(results) return None def search_song_name(name: str) -> list[Song]: name = '%' + name + '%' results: list = execute_query('sql/get_song_by_name.sql', name) if not results: return [] songs: list[Song] = [] current_song_id: int | None = None rows = [] for row in results: if not current_song_id: current_song_id = row[0] rows.append(row) elif row[0] != current_song_id: songs.append(Song.from_rows(rows)) current_song_id = row[0] rows = [row] else: rows.append(row) return songs def search_song_by_id(id: int) -> Song | None: results: list = execute_query('sql/get_song_by_id.sql', id) if results: return Song.from_rows(results) return None def insert_song(id: int, name: str, length: int, album_id: int, artist_id: int, mbid: str | None = None) -> None: execute_query('sql/insert_song.sql', id, name, length, album_id, artist_id, mbid) def get_last_song_id() -> int: return execute_query('sql/get_last_song_id.sql')[0][0] def search_album_by_name(name: str) -> list[Album]: name = '%' + name + '%' results: list = execute_query('sql/get_album_by_name.sql', name) if not results: return [] albums: list[Album] = [] for row in results: albums.append(Album.from_row(row)) return albums def search_album_by_mbid(mbid: str) -> Album | None: results: list = execute_query('sql/get_album_by_mbid.sql', mbid) return Album.from_row(results[0]) if results else None def search_album_by_id(id: int) -> Album | None: results: list = execute_query('sql/get_album_by_id.sql', id) return Album.from_row(results[0]) if results else None def insert_album(name: str, mbid: str | None = None, cover_art_url: str | None = None) -> Album: return search_album_by_id(execute_insertion_query('sql/insert_album.sql', name, mbid, cover_art_url)) def search_artist_by_name(name: str) -> list[Artist]: name = '%' + name + '%' results: list = execute_query('sql/get_artist_by_name.sql', name) if not results: return [] artists: list[Artist] = [] for row in results: artists.append(Artist.from_row(row)) return artists def search_artist_by_mbid(mbid: str) -> Artist | None: results: list = execute_query('sql/get_artist_by_mbid.sql', mbid) return Artist.from_row(results[0]) if results else None def search_artist_by_id(id: int) -> Artist | None: results: list = execute_query('sql/get_artist_by_id.sql', id) return Artist.from_row(results[0]) if results else None def insert_artist(name: str, mbid: str | None = None, icon_url: str | None = None) -> None: return search_artist_by_id(execute_insertion_query('sql/insert_artist.sql', name, mbid, icon_url))