Navidrome-Tracker/SubSonic.py
2025-04-15 16:35:13 -05:00

480 lines
14 KiB
Python

import time
import random
import string
import hashlib
import logging
import urllib.parse
import requests
logger = logging.getLogger(__name__)
class Playlist:
id: str
name: str
songCount: int
duration: str
owner: str
coverArtId: str
def __init__(self, info: dict):
self.id = info['id']
self.name = info['name']
self.songCount = int(info['songCount'])
self.duration = time.strftime('%H:%M:%S', time.gmtime(int(info['duration'])))
self.owner = info['owner']
self.coverArtId = info['coverArt']
class Album:
id: str
title: str
name: str
album: str
coverArtId: str
duration: int
artistId: str
songCount: int
playCount: int
year: int | None
duration: str
def __init__(self, album_info: dict):
self.id = album_info['id']
self.title = album_info['title']
self.name = album_info['name']
self.album = album_info['album']
self.coverArtId = album_info['coverArt']
self.duration = int(album_info['duration'])
self.artistId = album_info['artistId']
self.songCount = int(album_info['songCount'])
self.playCount = int(album_info['playCount']) if 'playCount' in album_info else 0
self.year = int(album_info['year']) if 'year' in album_info else None
self.duration = self._secondToMinutes(int(album_info['duration']))
def _secondToMinutes(self, s: int) -> str:
return f'{s // 60}:{s % 60 if s % 60 > 9 else "0" + str(s % 60)}'
class Song:
id: str
title: str
album: str
artist: str
trackNumber: int
contentType: str
suffix: str
duration: int
bitRate: int
path: str
discNumber: int
albumId: str
artistId: str
comment: str
channelCount: int
samplingRate: int
playCount: int | None
coverArtId: str | None
def __init__(self, info: dict | None = {}):
self.id = info['id'] if 'id' in info else ""
self.title = info['title'] if 'title' in info else ""
self.album = info['album'] if 'album' in info else ""
self.artist = info['artist'] if 'artist' in info else ""
self.trackNumber = int(info['track']) if 'track' in info else ""
self.contentType = info['contentType'] if 'contentType' in info else ""
self.suffix = info['suffix'] if 'suffix' in info else ""
self.duration = int(info['duration']) if 'duration' in info else ""
self.bitRate = int(info['bitRate']) if 'bitRate' in info else ""
self.path = info['path'] if 'path' in info else ""
self.discNumber = int(info['discNumber']) if 'discNumber' in info else ""
self.albumId = info['albumId'] if 'albumId' in info else ""
self.artistId = info['artistId'] if 'artistId' in info else ""
self.comment = info['comment'] if 'comment' in info else ""
self.channelCount = int(info['channelCount']) if 'channelCount' in info else ""
self.samplingRate = int(info['samplingRate']) if 'samplingRate' in info else ""
self.playCount = int(info['playCount']) if 'playCount' in info else ""
self.coverArtId = info['coverArt'] if 'coverArt' in info else None
class Artist:
id: str
name: str
albumCount: int
coverArt: str
artistImageUrl: str
def __init__(self, id: str, name: str, albumCount: str | int, coverArt: str, artistImageUrl: str):
self.id = id
self.name = name
self.albumCount = int(albumCount)
self.coverArt = coverArt
self.artistImageUrl = artistImageUrl
def from_dict(info: dict):
return Artist(
info['id'] if 'id' in info else None,
info['name'] if 'name' in info else None,
info['albumCount'] if 'albumCount' in info else None,
info['coverArt'] if 'coverArt' in info else None,
info['artistImageUrl'] if 'artistImageUrl' in info else None
)
class UserNowPlaying:
id: str
title: str
album: str
artist: str
track: int
year: int
coverArt: str | None
duration: int
class BadSubsonicResponse(Exception):
pass
class SubpyConn:
username: str
password: str
base_url: str
salt: str
salt_len: int = 8
version: str = "1.16.1"
client_name: str = "testing"
def __init__(self, u: str, p: str, base_url: str, salt_len: int | None = 8):
self.username = u
self.password = p
self.base_url = base_url
self.salt_len = salt_len
def _saltGenerator(self) -> None:
self.salt = ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(self.salt_len))
def _hashPass(self) -> str:
self._saltGenerator()
return hashlib.md5((self.password + self.salt).encode('utf-8')).hexdigest()
def _getBasePayload(self) -> dict:
return {
"u" : self.username,
"t" : self._hashPass(),
"s" : self.salt,
"v" : self.version,
"c" : self.client_name,
"f": "json"
}
def _encodePayload(self, payload: dict) -> str:
return urllib.parse.urlencode(payload, quote_via=urllib.parse.quote_plus)
def _getMusicDirectory(self, id: str):
url = self.base_url + '/rest/getMusicDirectory?'
payload = self._getBasePayload()
payload['id'] = id
return requests.get(url + self._encodePayload(payload))
def getArtistAlbums(self, artist_id: str) -> list[Album]:
response: requests.Response = self._getMusicDirectory(artist_id).json()['subsonic-response']
if response['status'] != 'ok':
raise BadSubsonicResponse()
albums: list[Album] = []
for item in response['directory']['child']:
albums.append(Album(item))
return albums
def getAlbumSongs(self, album_id: str) -> list[Song]:
response: requests.Response = self._getMusicDirectory(album_id).json()['subsonic-response']
if response['status'] != 'ok':
raise BadSubsonicResponse()
songs: list[Song] = []
for item in response['directory']['child']:
songs.append(Song(item))
return songs
def getArtists(self) -> list[Artist]:
url = self.base_url + '/rest/getArtists?'
payload = self._getBasePayload()
response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response']
if response['status'] != 'ok':
raise BadSubsonicResponse()
artists: list[Artist] = []
for artist_index in response['artists']['index']:
for artist in artist_index['artist']:
artists.append(Artist.from_dict(artist))
return artists
def getAlbums(self, size: int | None = 25, offset: int | None = 0) -> list[Album]:
url = self.base_url + '/rest/getAlbumList?'
payload = self._getBasePayload()
payload['type'] = 'alphabeticalByName'
payload['size'] = size
payload['offset'] = offset
r = requests.get(url + self._encodePayload(payload))
json_r = r.json()
albums: list[Album] = []
if 'album' not in json_r['subsonic-response']['albumList']:
return []
for album in json_r['subsonic-response']['albumList']['album']:
albums.append(Album(album))
return albums
def getAllAlbums(self) -> list[Album]:
all_albums: list[Album] = []
offset = 0
while albums := self.getAlbums(50, offset):
all_albums.extend(albums)
offset += 50
return all_albums
def getPlaylists(self) -> list[Playlist]:
url = self.base_url + "/rest/getPlaylists?"
payload = self._getBasePayload()
r = requests.get(url + self._encodePayload(payload))
json_r = r.json()
playlists: list[Playlist] = []
for playlist in json_r['subsonic-response']['playlists']['playlist']:
playlists.append(Playlist(playlist))
return playlists
def getPlaylist(self, playlist_id: str) -> list[Song]:
url = self.base_url + "/rest/getPlaylist?"
payload = self._getBasePayload()
payload['id'] = playlist_id
r = requests.get(url + self._encodePayload(payload))
json_r = r.json()
songs: list[Song] = []
for song in json_r['subsonic-response']['playlist']['entry']:
songs.append(Song(song))
return songs
def getStarred(self) -> list[Album | Song | Artist]:
url = self.base_url + "/rest/getStarred?"
payload = self._getBasePayload()
r = requests.get(url + self._encodePayload(payload))
json_r = r.json()
starred: list[Album | Song | Artist] = []
for album in json_r['subsonic-response']['starred']['artist']:
starred.append(Album(album))
for artist in json_r['subsonic-response']['starred']['artist']:
starred.append(Artist.from_dict(artist))
for song in json_r['subsonic-response']['starred']['song']:
starred.append(Song(song))
return starred
def getStarredSongs(self) -> list[Song]:
url = self.base_url + "/rest/getStarred?"
payload = self._getBasePayload()
r = requests.get(url + self._encodePayload(payload))
json_r = r.json()
starred: list[Song] = []
for song in json_r['subsonic-response']['starred']['song']:
starred.append(Song(song))
return starred
# Returns a requests Response object, the data stream can be found in Response.content
def stream(self, song_id: str) -> requests.Response:
url = self.base_url + '/rest/stream?'
payload = self._getBasePayload()
payload['id'] = song_id
return requests.get(url + self._encodePayload(payload), stream=True)
# Saves cover art id
def getCoverArt(self, cover_art_id: str, path: str | None = None) -> None:
url = self.base_url + '/rest/getCoverArt?'
payload = self._getBasePayload()
payload['id'] = cover_art_id
r = requests.get(url + self._encodePayload(payload))
with open(path if path else 'cover.png', 'wb') as file:
file.write(r.content)
def getCoverArtRaw(self, cover_art_id: str) -> bytes:
url = self.base_url + '/rest/getCoverArt?'
payload = self._getBasePayload()
payload['id'] = cover_art_id
r = requests.get(url + self._encodePayload(payload))
return r.content
def getSong(self, song_id: str) -> Song:
url = self.base_url + '/rest/getSong?'
payload = self._getBasePayload()
payload['id'] = song_id
response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response']
if response['status'] != 'ok':
raise BadSubsonicResponse()
return Song(response['song'])
def star(self, id: str) -> None:
url = self.base_url + '/rest/star?'
payload = self._getBasePayload()
payload['id'] = id
response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response']
if response['status'] != 'ok':
raise BadSubsonicResponse()
def unstar(self, id: str) -> None:
url = self.base_url + '/rest/unstar?'
payload = self._getBasePayload()
payload['id'] = id
response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response']
if response['status'] != 'ok':
raise BadSubsonicResponse()
"""
https://opensubsonic.netlify.app/docs/endpoints/scrobble/
id: str - unique identifier of the media to scrobble
OPTIONAL time: int - posix time of when the song was listened to
OPTIONAL submission: bool - Whether this is a "submission" or a "now playing" notification. Default: True
"""
def scrobble(self, id: str, time: int | None = None, submission: bool | None = True) -> None:
url = self.base_url + '/rest/scrobble?'
payload = self._getBasePayload()
payload['id'] = id
if time:
payload['time'] = time
payload['submission'] = submission
response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response']
if response['status'] != 'ok':
raise BadSubsonicResponse()
"""
https://opensubsonic.netlify.app/docs/endpoints/getrandomsongs/
"""
def getRandomSongs(self,
size: int | None = 10,
genre: str | None = None,
fromYear: str | int | None = None,
toYear: str | int | None = None,
musicFolderId: str | int | None = None
) -> list[Song]:
url = self.base_url + '/rest/getRandomSongs?'
payload = self._getBasePayload()
# The default being 10 is only to make the method match the internal docs
if size and size != 10:
payload['size'] = size
if genre:
payload['genre'] = genre
if fromYear:
payload['fromYear'] = fromYear
if toYear:
payload['toYear'] = toYear
if musicFolderId:
payload['musicFolderId'] = musicFolderId
response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response']
if response['status'] != 'ok':
raise BadSubsonicResponse()
random_songs: list[Song] = []
for song in response['randomSongs']['song']:
random_songs.append(Song(song))
return random_songs
def getUser(self, username: str) -> None:
url = self.base_url + '/rest/getUser?'
payload = self._getBasePayload()
payload['username'] = username
response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response']
return response
def getNowPlaying(self) -> dict:
url = self.base_url + "/rest/getNowPlaying?"
payload = self._getBasePayload()
response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response']
return response
def getArtist(self, artist_id: str, raw_json: bool | None = False) -> dict:
url = self.base_url + "/rest/getArtist?"
payload = self._getBasePayload()
payload['id'] = artist_id
response = requests.get(url + self._encodePayload(payload))
if raw_json:
response = response.json()['subsonic-response']
return response