import logging import json import re import musicbrainzngs import flask import ListenBrainz import MusicDatabase logging.basicConfig( format="%(levelname)s %(module)s::%(filename)s::%(funcName)s %(asctime)s - %(message)s", level=logging.DEBUG ) app = flask.Flask(__name__) musicbrainzngs.set_useragent("Navidrome Scrobbler", "beta 0.0") # Returns a tuple of (track_mbid, album_mbid, artist_mbids: list) # TODO improve matching instead of assuming the top result is correct def get_mbid(scrobble: ListenBrainz.ListenBrainzScrobble) -> tuple[str, str, list[str]]: optional_args = {} if scrobble.duration: optional_args['dur'] = scrobble.duration * 1000 elif scrobble.duration_ms: optional_args['dur'] = scrobble.duration_ms result = musicbrainzngs.search_recordings(f"{scrobble.track_name} {scrobble.release_name}", artist=scrobble.artist_name, **optional_args) # logging.debug(json.dumps(result, indent=4)) with open('listen-brain-request.json', 'w') as file: file.write(json.dumps(result, indent=4)) top_result: dict = result['recording-list'][0] # print(json.dumps(top_result, indent=4)) track_mbid: str = top_result['id'] track_name: str = top_result['title'] album_mbid: str = top_result['release-list'][0]['id'] album_name: str = top_result['release-list'][0]['title'] # print(json.dumps(top_result[''])) artist_mbids: list[str] = [artist['artist']['id'] for artist in top_result['artist-credit']] artist_names: list[str] = [artist['name'] for artist in top_result['artist-credit']] logging.debug(f'Matched scrobble with {track_name=} {album_name=} {artist_names=}') return (track_mbid, album_mbid, artist_mbids) def record_scrobble(scrobble: ListenBrainz.ListenBrainzScrobble) -> None: track_mbid, album_mbid, artist_mbids = get_mbid(scrobble) if not (album := MusicDatabase.search_album_by_mbid(album_mbid)): album = MusicDatabase.insert_album(scrobble.release_name, album_mbid) artists: list[MusicDatabase.Artist] = [] for artist_mbid in artist_mbids: if not (artist := MusicDatabase.search_artist_by_mbid(artist_mbid)): artists.append(MusicDatabase.insert_artist(scrobble.artist_name, artist_mbid)) if not (track := MusicDatabase.search_song_mbid(track_mbid)) and scrobble.duration_ms: song_id: int = MusicDatabase.get_last_song_id() + 1 MusicDatabase.insert_song(song_id, scrobble.track_name, scrobble.duration_ms, album.id, artist.id, track_mbid) # See https://listenbrainz.readthedocs.io/en/latest/users/json.html#json-doc @app.route('/1/submit-listens', methods=["POST"]) def submit_listens(): if 'Authorization' not in flask.request.headers: return '', 401 user_token_match: re.Match[str] | None = re.match(r'Token (.*)', flask.request.headers['Authorization']) if not user_token_match: return '', 401 user_token: str = user_token_match.group(1) request_json: dict | None = flask.request.json if not request_json: return '', 401 listen_type: str = request_json['listen_type'] payload: list[dict] = request_json['payload'] logging.debug("User Token", user_token) for scrobble_data in payload: logging.debug("Recievied scrobble", json.dumps(scrobble_data, indent=4)) scrobble = ListenBrainz.ListenBrainzScrobble.from_json(scrobble_data) return '', 200 if __name__ == '__main__': app.run('127.0.0.1', port=8545)