143 lines
5.2 KiB
Python
143 lines
5.2 KiB
Python
import difflib
|
|
import logging
|
|
import json
|
|
import re
|
|
|
|
import musicbrainzngs
|
|
import flask
|
|
|
|
import ListenBrainz
|
|
import MusicDatabase
|
|
|
|
logging.basicConfig(
|
|
format="%(levelname)s %(module)s::%(filename)s::%(funcName)s %(asctime)s - %(message)s",
|
|
level=logging.DEBUG,
|
|
filename="Hey-Listen.log"
|
|
)
|
|
|
|
# TODO : Do something about this
|
|
USER_ID = 1
|
|
|
|
app = flask.Flask(__name__)
|
|
|
|
musicbrainzngs.set_useragent("Navidrome Scrobbler", "beta 0.0")
|
|
|
|
# Returns a tuple of (track_mbid, album_mbid, artist_mbids: list)
|
|
def similarity(a: str, b: str) -> float:
|
|
return difflib.SequenceMatcher(None, a.lower(), b.lower()).ratio()
|
|
|
|
def get_mbid(scrobble: ListenBrainz.ListenBrainzScrobble) -> tuple[str, str, list[str]]:
|
|
optional_args = {}
|
|
if scrobble.duration:
|
|
optional_args['dur'] = scrobble.duration * 1000
|
|
elif scrobble.duration_ms:
|
|
optional_args['dur'] = scrobble.duration_ms
|
|
|
|
query = f"{scrobble.track_name} {scrobble.release_name}"
|
|
result = musicbrainzngs.search_recordings(query, artist=scrobble.artist_name, **optional_args)
|
|
|
|
with open('listen-brain-request.json', 'w') as file:
|
|
file.write(json.dumps(result, indent=4))
|
|
|
|
candidates = result.get('recording-list', [])
|
|
if not candidates:
|
|
raise ValueError("No MusicBrainz results found.")
|
|
|
|
def score(candidate):
|
|
score = 0
|
|
# Title similarity
|
|
score += similarity(candidate.get('title', ''), scrobble.track_name) * 2
|
|
# Artist similarity
|
|
artist_names = " ".join([a['name'] for a in candidate.get('artist-credit', [])])
|
|
score += similarity(artist_names, scrobble.artist_name) * 2
|
|
# Album similarity
|
|
if 'release-list' in candidate and candidate['release-list']:
|
|
score += similarity(candidate['release-list'][0].get('title', ''), scrobble.release_name)
|
|
# Duration similarity (if available)
|
|
if scrobble.duration_ms and 'length' in candidate:
|
|
try:
|
|
mb_length = int(candidate['length'])
|
|
diff = abs(mb_length - scrobble.duration_ms)
|
|
score += max(0, 1 - diff / 10000) # 10s tolerance
|
|
except Exception:
|
|
pass
|
|
return score
|
|
|
|
best = max(candidates, key=score)
|
|
|
|
track_mbid: str = best['id']
|
|
track_name: str = best['title']
|
|
album_mbid: str = best['release-list'][0]['id'] if best.get('release-list') else ''
|
|
album_name: str = best['release-list'][0]['title'] if best.get('release-list') else ''
|
|
artist_mbids: list[str] = [artist['artist']['id'] for artist in best.get('artist-credit', [])]
|
|
artist_names: list[str] = [artist['name'] for artist in best.get('artist-credit', [])]
|
|
|
|
logging.debug(f'Matched scrobble with {track_name=} {album_name=} {artist_names=}')
|
|
return (track_mbid, album_mbid, artist_mbids)
|
|
|
|
def record_scrobble(scrobble: ListenBrainz.ListenBrainzScrobble, user_id: int) -> None:
|
|
track_mbid, album_mbid, artist_mbids = get_mbid(scrobble)
|
|
|
|
if not (album := MusicDatabase.search_album_by_mbid(album_mbid)):
|
|
album = MusicDatabase.insert_album(scrobble.release_name, album_mbid)
|
|
|
|
artists: list[MusicDatabase.Artist] = []
|
|
for artist_mbid in artist_mbids:
|
|
artist = MusicDatabase.search_artist_by_mbid(artist_mbid)
|
|
if not artist:
|
|
artist_id: MusicDatabase.Artist = MusicDatabase.insert_artist(scrobble.artist_name, artist_mbid)
|
|
artists.append(artist_id)
|
|
|
|
# Ensure song exists for each artist (one row per artist)
|
|
track = MusicDatabase.search_song_mbid(track_mbid)
|
|
if not track and scrobble.duration_ms:
|
|
song_id: int = MusicDatabase.get_last_song_id() + 1
|
|
for artist in artists:
|
|
MusicDatabase.insert_song(song_id, scrobble.track_name, scrobble.duration_ms, album.id, artist.id, track_mbid)
|
|
track = MusicDatabase.search_song_mbid(track_mbid)
|
|
|
|
# Insert listen event
|
|
if track:
|
|
listened_at = getattr(scrobble, 'listened_at', None)
|
|
if listened_at is None and hasattr(scrobble, 'timestamp'):
|
|
listened_at = scrobble.timestamp
|
|
if listened_at is None:
|
|
import time
|
|
listened_at = int(time.time())
|
|
MusicDatabase.insert_listen_event(track.id, user_id, listened_at, 0)
|
|
|
|
# See https://listenbrainz.readthedocs.io/en/latest/users/json.html#json-doc
|
|
@app.route('/1/submit-listens', methods=["POST"])
|
|
def submit_listens():
|
|
|
|
if 'Authorization' not in flask.request.headers:
|
|
return '', 401
|
|
|
|
user_token_match: re.Match[str] | None = re.match(r'Token (.*)', flask.request.headers['Authorization'])
|
|
|
|
if not user_token_match:
|
|
return '', 401
|
|
|
|
user_token: str = user_token_match.group(1)
|
|
|
|
request_json: dict | None = flask.request.json
|
|
if not request_json:
|
|
return '', 401
|
|
|
|
listen_type: str = request_json['listen_type']
|
|
payload: list[dict] = request_json['payload']
|
|
|
|
logging.debug("User Token", user_token)
|
|
for scrobble_data in payload:
|
|
logging.debug("Recievied scrobble", json.dumps(scrobble_data, indent=4))
|
|
|
|
scrobble = ListenBrainz.ListenBrainzScrobble.from_json(scrobble_data)
|
|
|
|
record_scrobble(scrobble, USER_ID)
|
|
|
|
|
|
return '', 200
|
|
|
|
if __name__ == '__main__':
|
|
app.run('127.0.0.1', port=8545)
|