import difflib import logging import json import re import musicbrainzngs import flask import ListenBrainz import MusicDatabase logging.basicConfig( format="%(levelname)s %(module)s::%(filename)s::%(funcName)s %(asctime)s - %(message)s", level=logging.DEBUG, filename="Hey-Listen.log" ) # TODO : Do something about this USER_ID = 1 app = flask.Flask(__name__) musicbrainzngs.set_useragent("Navidrome Scrobbler", "beta 0.0") # Returns a tuple of (track_mbid, album_mbid, artist_mbids: list) def similarity(a: str, b: str) -> float: return difflib.SequenceMatcher(None, a.lower(), b.lower()).ratio() def get_mbid(scrobble: ListenBrainz.ListenBrainzScrobble) -> tuple[str, str, list[str]]: optional_args = {} if scrobble.duration: optional_args['dur'] = scrobble.duration * 1000 elif scrobble.duration_ms: optional_args['dur'] = scrobble.duration_ms query = f"{scrobble.track_name} {scrobble.release_name}" result = musicbrainzngs.search_recordings(query, artist=scrobble.artist_name, **optional_args) with open('listen-brain-request.json', 'w') as file: file.write(json.dumps(result, indent=4)) candidates = result.get('recording-list', []) if not candidates: raise ValueError("No MusicBrainz results found.") def score(candidate): score = 0 # Title similarity score += similarity(candidate.get('title', ''), scrobble.track_name) * 2 # Artist similarity artist_names = " ".join([a['name'] for a in candidate.get('artist-credit', [])]) score += similarity(artist_names, scrobble.artist_name) * 2 # Album similarity if 'release-list' in candidate and candidate['release-list']: score += similarity(candidate['release-list'][0].get('title', ''), scrobble.release_name) # Duration similarity (if available) if scrobble.duration_ms and 'length' in candidate: try: mb_length = int(candidate['length']) diff = abs(mb_length - scrobble.duration_ms) score += max(0, 1 - diff / 10000) # 10s tolerance except Exception: pass return score best = max(candidates, key=score) track_mbid: str = best['id'] track_name: str = best['title'] album_mbid: str = best['release-list'][0]['id'] if best.get('release-list') else '' album_name: str = best['release-list'][0]['title'] if best.get('release-list') else '' artist_mbids: list[str] = [artist['artist']['id'] for artist in best.get('artist-credit', [])] artist_names: list[str] = [artist['name'] for artist in best.get('artist-credit', [])] logging.debug(f'Matched scrobble with {track_name=} {album_name=} {artist_names=}') return (track_mbid, album_mbid, artist_mbids) def record_scrobble(scrobble: ListenBrainz.ListenBrainzScrobble, user_id: int) -> None: track_mbid, album_mbid, artist_mbids = get_mbid(scrobble) if not (album := MusicDatabase.search_album_by_mbid(album_mbid)): album = MusicDatabase.insert_album(scrobble.release_name, album_mbid) artists: list[MusicDatabase.Artist] = [] for artist_mbid in artist_mbids: artist = MusicDatabase.search_artist_by_mbid(artist_mbid) if not artist: artist_id: MusicDatabase.Artist = MusicDatabase.insert_artist(scrobble.artist_name, artist_mbid) artists.append(artist_id) # Ensure song exists for each artist (one row per artist) track = MusicDatabase.search_song_mbid(track_mbid) if not track and scrobble.duration_ms: song_id: int = MusicDatabase.get_last_song_id() + 1 for artist in artists: MusicDatabase.insert_song(song_id, scrobble.track_name, scrobble.duration_ms, album.id, artist.id, track_mbid) track = MusicDatabase.search_song_mbid(track_mbid) # Insert listen event if track: listened_at = getattr(scrobble, 'listened_at', None) if listened_at is None and hasattr(scrobble, 'timestamp'): listened_at = scrobble.timestamp if listened_at is None: import time listened_at = int(time.time()) MusicDatabase.insert_listen_event(track.id, user_id, listened_at, 0) # See https://listenbrainz.readthedocs.io/en/latest/users/json.html#json-doc @app.route('/1/submit-listens', methods=["POST"]) def submit_listens(): if 'Authorization' not in flask.request.headers: return '', 401 user_token_match: re.Match[str] | None = re.match(r'Token (.*)', flask.request.headers['Authorization']) if not user_token_match: return '', 401 user_token: str = user_token_match.group(1) request_json: dict | None = flask.request.json if not request_json: return '', 401 listen_type: str = request_json['listen_type'] payload: list[dict] = request_json['payload'] logging.debug("User Token", user_token) for scrobble_data in payload: logging.debug("Recievied scrobble", json.dumps(scrobble_data, indent=4)) scrobble = ListenBrainz.ListenBrainzScrobble.from_json(scrobble_data) record_scrobble(scrobble, USER_ID) return '', 200 if __name__ == '__main__': app.run('127.0.0.1', port=8545)