Navidrome-Tracker/MusicDatabase.py
2025-04-22 16:35:48 -05:00

163 lines
4.0 KiB
Python

import datetime
import sqlite3
import logging
import os
from dataclasses import dataclass
from typing import Self
DATABASE = "music-history.db"
@dataclass
class Album:
id: int
name: str
mbid: str | None = None
cover_art_url: str | None = None
@classmethod
def from_row(cls, row: tuple[int, str, str | None, str | None]) -> Self:
return cls(*row)
class Artist:
pass
@dataclass
class Song:
id: int
name: str
length: int
album_id: int
artist_ids: list[int]
mbid: str | None = None # Music Brainz Id
@classmethod
def from_rows(cls, rows: list[tuple[int, str, int, int, int, str | None]]) -> Self:
artist_ids: list[int] = [row[4] for row in rows]
first_row = rows[0]
return cls(
first_row[0],
first_row[1],
first_row[2],
first_row[3],
artist_ids,
first_row[5]
)
def __str__(self) -> str:
return f'Title: {self.name}, Length: {self.length}, Album Id: {self.album_id}, Artist Ids: {self.artist_ids}, MBID: {self.mbid}'
class Connect:
def __init__(self, database: str):
self.con: sqlite3.Connection = sqlite3.connect(database)
def __enter__(self) -> tuple[sqlite3.Connection, sqlite3.Cursor]:
return self.con, self.con.cursor()
def __exit__(self, type, value, traceback):
self.con.commit()
self.con.close()
# Takes a path to a .sql file and returns it's contents
def _load_query(path: str) -> str:
return open(path, 'r').read()
def execute_query(query_path: str, qargs: list | tuple) -> list:
if not os.path.exists(query_path):
logging.error(f'Query Path "{query_path}" does not exist')
return []
query: str = _load_query(query_path)
with Connect(DATABASE) as (conn, cur):
cur.execute(query, qargs)
return cur.fetchall()
def insert_listen_event(song_id: int, user_id: int, date: int | datetime.datetime, time: int):
pass
def search_song_mbid(mbid: str) -> Song | None:
results: list = execute_query('sql/get_song_by_mbid.sql', (mbid,))
if results:
return Song.from_rows(results)
return None
def search_song_name(name: str) -> list[Song] | None:
results: list = execute_query('sql/get_song_by_name.sql', (name,))
if not results:
return None
songs: list[Song] = []
current_song_id: int | None = None
rows = []
for row in results:
if not current_song_id:
current_song_id = row[0]
rows.append(row)
elif row[0] != current_song_id:
songs.append(Song.from_rows(rows))
current_song_id = row[0]
rows = [row]
else:
rows.append(row)
return songs
def search_song_by_id(id: int) -> Song | None:
results: list = execute_query('sql/get_song_by_id.sql', (id,))
if results:
return Song.from_rows(results)
return None
def insert_song(id: int, name: str, length: int, album_id: int, artist_id: int, mbid: str | None = None) -> None:
execute_query('sql/insert_song.sql', (id, name, length, album_id, artist_id, mbid))
def get_last_song_id() -> int:
return execute_query('sql/get_last_song_id.sql', [])[0][0]
def search_album_by_name(name: str) -> list[Album] | None:
results: list = execute_query('sql/get_album_by_name.sql', (name,))
if not results:
return None
albums: list[Album] = []
for row in results:
albums.append(Album.from_row(row))
def search_album_by_mbid(mbid: str) -> Album | None:
pass
def search_album_by_id(id: int) -> Album | None:
pass
def insert_album(name: str, mbid: str | None, cover_art_url: str | None) -> None:
pass
def search_artist_by_name(name: str) -> list[Artist] | None:
pass
def search_artist_by_mbid(mbid: str) -> Artist | None:
pass
def search_artist_by_id(id: int) -> Artist | None:
pass
def insert_artist(name: str, mbid: str | None = None, icon_url: str | None = None) -> None:
pass