Compare commits

...

1 Commits

Author SHA1 Message Date
JISAUAY
eedc2f0471 had co pilot write some stuff 2025-06-09 14:16:57 -05:00
8 changed files with 77 additions and 4933 deletions

2
.gitignore vendored
View File

@ -1,3 +1,5 @@
.vscode .vscode
__pycache__ __pycache__
config.ini config.ini
*.json
*.log

View File

@ -1,10 +1,11 @@
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime
@dataclass @dataclass
class ListenBrainzScrobble: class ListenBrainzScrobble:
artist_name: str artist_name: str
release_name: str release_name: str
track_name: str track_name: str
listened_at: datetime
# Optional # Optional
artist_mbids: list[str] | None = None artist_mbids: list[str] | None = None
@ -56,5 +57,6 @@ class ListenBrainzScrobble:
artist_name=data['artist_name'], artist_name=data['artist_name'],
release_name=data['release_name'], release_name=data['release_name'],
track_name=data['track_name'], track_name=data['track_name'],
listened_at=datetime.fromtimestamp(data['listened_at']),
**{key:(add_info[key] if key in add_info else None) for key in additional_info_keys} **{key:(add_info[key] if key in add_info else None) for key in additional_info_keys}
) )

View File

@ -97,7 +97,10 @@ def execute_insertion_query(query_path: str, *args) -> int | None:
return cur.lastrowid return cur.lastrowid
def insert_listen_event(song_id: int, user_id: int, date: int | datetime.datetime, time: int): def insert_listen_event(song_id: int, user_id: int, date: int | datetime.datetime, time: int):
pass if isinstance(date, datetime.datetime):
date = int(date.timestamp())
execute_insertion_query('sql/insert_listen_event.sql', song_id, user_id, date, time)
def search_song_mbid(mbid: str) -> Song | None: def search_song_mbid(mbid: str) -> Song | None:
results: list = execute_query('sql/get_song_by_mbid.sql', mbid) results: list = execute_query('sql/get_song_by_mbid.sql', mbid)
@ -199,5 +202,5 @@ def search_artist_by_id(id: int) -> Artist | None:
return Artist.from_row(results[0]) if results else None return Artist.from_row(results[0]) if results else None
def insert_artist(name: str, mbid: str | None = None, icon_url: str | None = None) -> None: def insert_artist(name: str, mbid: str | None = None, icon_url: str | None = None) -> Artist | None:
return search_artist_by_id(execute_insertion_query('sql/insert_artist.sql', name, mbid, icon_url)) return search_artist_by_id(execute_insertion_query('sql/insert_artist.sql', name, mbid, icon_url))

File diff suppressed because it is too large Load Diff

View File

@ -20,6 +20,9 @@ def main():
now_playing = conn.getNowPlaying() now_playing = conn.getNowPlaying()
with open('navidrome-request.json', 'w') as file:
file.write(json.dumps(now_playing))
# print(json.dumps(now_playing, indent=4)) # print(json.dumps(now_playing, indent=4))

Binary file not shown.

View File

@ -1,3 +1,4 @@
import difflib
import logging import logging
import json import json
import re import re
@ -10,15 +11,21 @@ import MusicDatabase
logging.basicConfig( logging.basicConfig(
format="%(levelname)s %(module)s::%(filename)s::%(funcName)s %(asctime)s - %(message)s", format="%(levelname)s %(module)s::%(filename)s::%(funcName)s %(asctime)s - %(message)s",
level=logging.DEBUG level=logging.DEBUG,
filename="Hey-Listen.log"
) )
# TODO : Do something about this
USER_ID = 1
app = flask.Flask(__name__) app = flask.Flask(__name__)
musicbrainzngs.set_useragent("Navidrome Scrobbler", "beta 0.0") musicbrainzngs.set_useragent("Navidrome Scrobbler", "beta 0.0")
# Returns a tuple of (track_mbid, album_mbid, artist_mbids: list) # Returns a tuple of (track_mbid, album_mbid, artist_mbids: list)
# TODO improve matching instead of assuming the top result is correct def similarity(a: str, b: str) -> float:
return difflib.SequenceMatcher(None, a.lower(), b.lower()).ratio()
def get_mbid(scrobble: ListenBrainz.ListenBrainzScrobble) -> tuple[str, str, list[str]]: def get_mbid(scrobble: ListenBrainz.ListenBrainzScrobble) -> tuple[str, str, list[str]]:
optional_args = {} optional_args = {}
if scrobble.duration: if scrobble.duration:
@ -26,28 +33,49 @@ def get_mbid(scrobble: ListenBrainz.ListenBrainzScrobble) -> tuple[str, str, lis
elif scrobble.duration_ms: elif scrobble.duration_ms:
optional_args['dur'] = scrobble.duration_ms optional_args['dur'] = scrobble.duration_ms
result = musicbrainzngs.search_recordings(f"{scrobble.track_name} {scrobble.release_name}", artist=scrobble.artist_name, **optional_args) query = f"{scrobble.track_name} {scrobble.release_name}"
result = musicbrainzngs.search_recordings(query, artist=scrobble.artist_name, **optional_args)
# logging.debug(json.dumps(result, indent=4))
with open('listen-brain-request.json', 'w') as file: with open('listen-brain-request.json', 'w') as file:
file.write(json.dumps(result, indent=4)) file.write(json.dumps(result, indent=4))
top_result: dict = result['recording-list'][0] candidates = result.get('recording-list', [])
if not candidates:
raise ValueError("No MusicBrainz results found.")
# print(json.dumps(top_result, indent=4)) def score(candidate):
score = 0
# Title similarity
score += similarity(candidate.get('title', ''), scrobble.track_name) * 2
# Artist similarity
artist_names = " ".join([a['name'] for a in candidate.get('artist-credit', [])])
score += similarity(artist_names, scrobble.artist_name) * 2
# Album similarity
if 'release-list' in candidate and candidate['release-list']:
score += similarity(candidate['release-list'][0].get('title', ''), scrobble.release_name)
# Duration similarity (if available)
if scrobble.duration_ms and 'length' in candidate:
try:
mb_length = int(candidate['length'])
diff = abs(mb_length - scrobble.duration_ms)
score += max(0, 1 - diff / 10000) # 10s tolerance
except Exception:
pass
return score
track_mbid: str = top_result['id'] best = max(candidates, key=score)
track_name: str = top_result['title']
album_mbid: str = top_result['release-list'][0]['id'] track_mbid: str = best['id']
album_name: str = top_result['release-list'][0]['title'] track_name: str = best['title']
# print(json.dumps(top_result[''])) album_mbid: str = best['release-list'][0]['id'] if best.get('release-list') else ''
artist_mbids: list[str] = [artist['artist']['id'] for artist in top_result['artist-credit']] album_name: str = best['release-list'][0]['title'] if best.get('release-list') else ''
artist_names: list[str] = [artist['name'] for artist in top_result['artist-credit']] artist_mbids: list[str] = [artist['artist']['id'] for artist in best.get('artist-credit', [])]
artist_names: list[str] = [artist['name'] for artist in best.get('artist-credit', [])]
logging.debug(f'Matched scrobble with {track_name=} {album_name=} {artist_names=}') logging.debug(f'Matched scrobble with {track_name=} {album_name=} {artist_names=}')
return (track_mbid, album_mbid, artist_mbids) return (track_mbid, album_mbid, artist_mbids)
def record_scrobble(scrobble: ListenBrainz.ListenBrainzScrobble) -> None: def record_scrobble(scrobble: ListenBrainz.ListenBrainzScrobble, user_id: int) -> None:
track_mbid, album_mbid, artist_mbids = get_mbid(scrobble) track_mbid, album_mbid, artist_mbids = get_mbid(scrobble)
if not (album := MusicDatabase.search_album_by_mbid(album_mbid)): if not (album := MusicDatabase.search_album_by_mbid(album_mbid)):
@ -55,14 +83,28 @@ def record_scrobble(scrobble: ListenBrainz.ListenBrainzScrobble) -> None:
artists: list[MusicDatabase.Artist] = [] artists: list[MusicDatabase.Artist] = []
for artist_mbid in artist_mbids: for artist_mbid in artist_mbids:
if not (artist := MusicDatabase.search_artist_by_mbid(artist_mbid)): artist = MusicDatabase.search_artist_by_mbid(artist_mbid)
artists.append(MusicDatabase.insert_artist(scrobble.artist_name, artist_mbid)) if not artist:
artist_id: MusicDatabase.Artist = MusicDatabase.insert_artist(scrobble.artist_name, artist_mbid)
artists.append(artist_id)
if not (track := MusicDatabase.search_song_mbid(track_mbid)) and scrobble.duration_ms: # Ensure song exists for each artist (one row per artist)
track = MusicDatabase.search_song_mbid(track_mbid)
if not track and scrobble.duration_ms:
song_id: int = MusicDatabase.get_last_song_id() + 1 song_id: int = MusicDatabase.get_last_song_id() + 1
for artist in artists:
MusicDatabase.insert_song(song_id, scrobble.track_name, scrobble.duration_ms, album.id, artist.id, track_mbid) MusicDatabase.insert_song(song_id, scrobble.track_name, scrobble.duration_ms, album.id, artist.id, track_mbid)
track = MusicDatabase.search_song_mbid(track_mbid)
# Insert listen event
if track:
listened_at = getattr(scrobble, 'listened_at', None)
if listened_at is None and hasattr(scrobble, 'timestamp'):
listened_at = scrobble.timestamp
if listened_at is None:
import time
listened_at = int(time.time())
MusicDatabase.insert_listen_event(track.id, user_id, listened_at, 0)
# See https://listenbrainz.readthedocs.io/en/latest/users/json.html#json-doc # See https://listenbrainz.readthedocs.io/en/latest/users/json.html#json-doc
@app.route('/1/submit-listens', methods=["POST"]) @app.route('/1/submit-listens', methods=["POST"])
@ -91,7 +133,7 @@ def submit_listens():
scrobble = ListenBrainz.ListenBrainzScrobble.from_json(scrobble_data) scrobble = ListenBrainz.ListenBrainzScrobble.from_json(scrobble_data)
record_scrobble(scrobble, USER_ID)
return '', 200 return '', 200

View File

@ -1 +1 @@
INSERT INTO "listen-events" (sond_id, user_id, date, time) VALUES (?, ?, ?, ?); INSERT INTO "listen-events" (song_id, user_id, date, time) VALUES (?, ?, ?, ?);