Navidrome-Tracker/scrobble.py
2025-06-09 14:16:57 -05:00

143 lines
5.2 KiB
Python

import difflib
import logging
import json
import re
import musicbrainzngs
import flask
import ListenBrainz
import MusicDatabase
logging.basicConfig(
format="%(levelname)s %(module)s::%(filename)s::%(funcName)s %(asctime)s - %(message)s",
level=logging.DEBUG,
filename="Hey-Listen.log"
)
# TODO : Do something about this
USER_ID = 1
app = flask.Flask(__name__)
musicbrainzngs.set_useragent("Navidrome Scrobbler", "beta 0.0")
# Returns a tuple of (track_mbid, album_mbid, artist_mbids: list)
def similarity(a: str, b: str) -> float:
return difflib.SequenceMatcher(None, a.lower(), b.lower()).ratio()
def get_mbid(scrobble: ListenBrainz.ListenBrainzScrobble) -> tuple[str, str, list[str]]:
optional_args = {}
if scrobble.duration:
optional_args['dur'] = scrobble.duration * 1000
elif scrobble.duration_ms:
optional_args['dur'] = scrobble.duration_ms
query = f"{scrobble.track_name} {scrobble.release_name}"
result = musicbrainzngs.search_recordings(query, artist=scrobble.artist_name, **optional_args)
with open('listen-brain-request.json', 'w') as file:
file.write(json.dumps(result, indent=4))
candidates = result.get('recording-list', [])
if not candidates:
raise ValueError("No MusicBrainz results found.")
def score(candidate):
score = 0
# Title similarity
score += similarity(candidate.get('title', ''), scrobble.track_name) * 2
# Artist similarity
artist_names = " ".join([a['name'] for a in candidate.get('artist-credit', [])])
score += similarity(artist_names, scrobble.artist_name) * 2
# Album similarity
if 'release-list' in candidate and candidate['release-list']:
score += similarity(candidate['release-list'][0].get('title', ''), scrobble.release_name)
# Duration similarity (if available)
if scrobble.duration_ms and 'length' in candidate:
try:
mb_length = int(candidate['length'])
diff = abs(mb_length - scrobble.duration_ms)
score += max(0, 1 - diff / 10000) # 10s tolerance
except Exception:
pass
return score
best = max(candidates, key=score)
track_mbid: str = best['id']
track_name: str = best['title']
album_mbid: str = best['release-list'][0]['id'] if best.get('release-list') else ''
album_name: str = best['release-list'][0]['title'] if best.get('release-list') else ''
artist_mbids: list[str] = [artist['artist']['id'] for artist in best.get('artist-credit', [])]
artist_names: list[str] = [artist['name'] for artist in best.get('artist-credit', [])]
logging.debug(f'Matched scrobble with {track_name=} {album_name=} {artist_names=}')
return (track_mbid, album_mbid, artist_mbids)
def record_scrobble(scrobble: ListenBrainz.ListenBrainzScrobble, user_id: int) -> None:
track_mbid, album_mbid, artist_mbids = get_mbid(scrobble)
if not (album := MusicDatabase.search_album_by_mbid(album_mbid)):
album = MusicDatabase.insert_album(scrobble.release_name, album_mbid)
artists: list[MusicDatabase.Artist] = []
for artist_mbid in artist_mbids:
artist = MusicDatabase.search_artist_by_mbid(artist_mbid)
if not artist:
artist_id: MusicDatabase.Artist = MusicDatabase.insert_artist(scrobble.artist_name, artist_mbid)
artists.append(artist_id)
# Ensure song exists for each artist (one row per artist)
track = MusicDatabase.search_song_mbid(track_mbid)
if not track and scrobble.duration_ms:
song_id: int = MusicDatabase.get_last_song_id() + 1
for artist in artists:
MusicDatabase.insert_song(song_id, scrobble.track_name, scrobble.duration_ms, album.id, artist.id, track_mbid)
track = MusicDatabase.search_song_mbid(track_mbid)
# Insert listen event
if track:
listened_at = getattr(scrobble, 'listened_at', None)
if listened_at is None and hasattr(scrobble, 'timestamp'):
listened_at = scrobble.timestamp
if listened_at is None:
import time
listened_at = int(time.time())
MusicDatabase.insert_listen_event(track.id, user_id, listened_at, 0)
# See https://listenbrainz.readthedocs.io/en/latest/users/json.html#json-doc
@app.route('/1/submit-listens', methods=["POST"])
def submit_listens():
if 'Authorization' not in flask.request.headers:
return '', 401
user_token_match: re.Match[str] | None = re.match(r'Token (.*)', flask.request.headers['Authorization'])
if not user_token_match:
return '', 401
user_token: str = user_token_match.group(1)
request_json: dict | None = flask.request.json
if not request_json:
return '', 401
listen_type: str = request_json['listen_type']
payload: list[dict] = request_json['payload']
logging.debug("User Token", user_token)
for scrobble_data in payload:
logging.debug("Recievied scrobble", json.dumps(scrobble_data, indent=4))
scrobble = ListenBrainz.ListenBrainzScrobble.from_json(scrobble_data)
record_scrobble(scrobble, USER_ID)
return '', 200
if __name__ == '__main__':
app.run('127.0.0.1', port=8545)