import time import random import string import hashlib import logging import urllib.parse import requests logger = logging.getLogger(__name__) class Playlist: id: str name: str songCount: int duration: str owner: str coverArtId: str def __init__(self, info: dict): self.id = info['id'] self.name = info['name'] self.songCount = int(info['songCount']) self.duration = time.strftime('%H:%M:%S', time.gmtime(int(info['duration']))) self.owner = info['owner'] self.coverArtId = info['coverArt'] class Album: id: str title: str name: str album: str coverArtId: str duration: int artistId: str songCount: int playCount: int year: int | None duration: str def __init__(self, album_info: dict): self.id = album_info['id'] self.title = album_info['title'] self.name = album_info['name'] self.album = album_info['album'] self.coverArtId = album_info['coverArt'] self.duration = int(album_info['duration']) self.artistId = album_info['artistId'] self.songCount = int(album_info['songCount']) self.playCount = int(album_info['playCount']) if 'playCount' in album_info else 0 self.year = int(album_info['year']) if 'year' in album_info else None self.duration = self._secondToMinutes(int(album_info['duration'])) def _secondToMinutes(self, s: int) -> str: return f'{s // 60}:{s % 60 if s % 60 > 9 else "0" + str(s % 60)}' class Song: id: str title: str album: str artist: str trackNumber: int contentType: str suffix: str duration: int bitRate: int path: str discNumber: int albumId: str artistId: str comment: str channelCount: int samplingRate: int playCount: int | None coverArtId: str | None def __init__(self, info: dict | None = {}): self.id = info['id'] if 'id' in info else "" self.title = info['title'] if 'title' in info else "" self.album = info['album'] if 'album' in info else "" self.artist = info['artist'] if 'artist' in info else "" self.trackNumber = int(info['track']) if 'track' in info else "" self.contentType = info['contentType'] if 'contentType' in info else "" self.suffix = info['suffix'] if 'suffix' in info else "" self.duration = int(info['duration']) if 'duration' in info else "" self.bitRate = int(info['bitRate']) if 'bitRate' in info else "" self.path = info['path'] if 'path' in info else "" self.discNumber = int(info['discNumber']) if 'discNumber' in info else "" self.albumId = info['albumId'] if 'albumId' in info else "" self.artistId = info['artistId'] if 'artistId' in info else "" self.comment = info['comment'] if 'comment' in info else "" self.channelCount = int(info['channelCount']) if 'channelCount' in info else "" self.samplingRate = int(info['samplingRate']) if 'samplingRate' in info else "" self.playCount = int(info['playCount']) if 'playCount' in info else "" self.coverArtId = info['coverArt'] if 'coverArt' in info else None class Artist: id: str name: str albumCount: int coverArt: str artistImageUrl: str def __init__(self, id: str, name: str, albumCount: str | int, coverArt: str, artistImageUrl: str): self.id = id self.name = name self.albumCount = int(albumCount) self.coverArt = coverArt self.artistImageUrl = artistImageUrl def from_dict(info: dict): return Artist( info['id'] if 'id' in info else None, info['name'] if 'name' in info else None, info['albumCount'] if 'albumCount' in info else None, info['coverArt'] if 'coverArt' in info else None, info['artistImageUrl'] if 'artistImageUrl' in info else None ) class UserNowPlaying: id: str title: str album: str artist: str track: int year: int coverArt: str | None duration: int class BadSubsonicResponse(Exception): pass class SubpyConn: username: str password: str base_url: str salt: str salt_len: int = 8 version: str = "1.16.1" client_name: str = "testing" def __init__(self, u: str, p: str, base_url: str, salt_len: int | None = 8): self.username = u self.password = p self.base_url = base_url self.salt_len = salt_len def _saltGenerator(self) -> None: self.salt = ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(self.salt_len)) def _hashPass(self) -> str: self._saltGenerator() return hashlib.md5((self.password + self.salt).encode('utf-8')).hexdigest() def _getBasePayload(self) -> dict: return { "u" : self.username, "t" : self._hashPass(), "s" : self.salt, "v" : self.version, "c" : self.client_name, "f": "json" } def _encodePayload(self, payload: dict) -> str: return urllib.parse.urlencode(payload, quote_via=urllib.parse.quote_plus) def _getMusicDirectory(self, id: str): url = self.base_url + '/rest/getMusicDirectory?' payload = self._getBasePayload() payload['id'] = id return requests.get(url + self._encodePayload(payload)) def getArtistAlbums(self, artist_id: str) -> list[Album]: response: requests.Response = self._getMusicDirectory(artist_id).json()['subsonic-response'] if response['status'] != 'ok': raise BadSubsonicResponse() albums: list[Album] = [] for item in response['directory']['child']: albums.append(Album(item)) return albums def getAlbumSongs(self, album_id: str) -> list[Song]: response: requests.Response = self._getMusicDirectory(album_id).json()['subsonic-response'] if response['status'] != 'ok': raise BadSubsonicResponse() songs: list[Song] = [] for item in response['directory']['child']: songs.append(Song(item)) return songs def getArtists(self) -> list[Artist]: url = self.base_url + '/rest/getArtists?' payload = self._getBasePayload() response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response'] if response['status'] != 'ok': raise BadSubsonicResponse() artists: list[Artist] = [] for artist_index in response['artists']['index']: for artist in artist_index['artist']: artists.append(Artist.from_dict(artist)) return artists def getAlbums(self, size: int | None = 25, offset: int | None = 0) -> list[Album]: url = self.base_url + '/rest/getAlbumList?' payload = self._getBasePayload() payload['type'] = 'alphabeticalByName' payload['size'] = size payload['offset'] = offset r = requests.get(url + self._encodePayload(payload)) json_r = r.json() albums: list[Album] = [] if 'album' not in json_r['subsonic-response']['albumList']: return [] for album in json_r['subsonic-response']['albumList']['album']: albums.append(Album(album)) return albums def getAllAlbums(self) -> list[Album]: all_albums: list[Album] = [] offset = 0 while albums := self.getAlbums(50, offset): all_albums.extend(albums) offset += 50 return all_albums def getPlaylists(self) -> list[Playlist]: url = self.base_url + "/rest/getPlaylists?" payload = self._getBasePayload() r = requests.get(url + self._encodePayload(payload)) json_r = r.json() playlists: list[Playlist] = [] for playlist in json_r['subsonic-response']['playlists']['playlist']: playlists.append(Playlist(playlist)) return playlists def getPlaylist(self, playlist_id: str) -> list[Song]: url = self.base_url + "/rest/getPlaylist?" payload = self._getBasePayload() payload['id'] = playlist_id r = requests.get(url + self._encodePayload(payload)) json_r = r.json() songs: list[Song] = [] for song in json_r['subsonic-response']['playlist']['entry']: songs.append(Song(song)) return songs def getStarred(self) -> list[Album | Song | Artist]: url = self.base_url + "/rest/getStarred?" payload = self._getBasePayload() r = requests.get(url + self._encodePayload(payload)) json_r = r.json() starred: list[Album | Song | Artist] = [] for album in json_r['subsonic-response']['starred']['artist']: starred.append(Album(album)) for artist in json_r['subsonic-response']['starred']['artist']: starred.append(Artist.from_dict(artist)) for song in json_r['subsonic-response']['starred']['song']: starred.append(Song(song)) return starred def getStarredSongs(self) -> list[Song]: url = self.base_url + "/rest/getStarred?" payload = self._getBasePayload() r = requests.get(url + self._encodePayload(payload)) json_r = r.json() starred: list[Song] = [] for song in json_r['subsonic-response']['starred']['song']: starred.append(Song(song)) return starred # Returns a requests Response object, the data stream can be found in Response.content def stream(self, song_id: str) -> requests.Response: url = self.base_url + '/rest/stream?' payload = self._getBasePayload() payload['id'] = song_id return requests.get(url + self._encodePayload(payload), stream=True) # Saves cover art id def getCoverArt(self, cover_art_id: str, path: str | None = None) -> None: url = self.base_url + '/rest/getCoverArt?' payload = self._getBasePayload() payload['id'] = cover_art_id r = requests.get(url + self._encodePayload(payload)) with open(path if path else 'cover.png', 'wb') as file: file.write(r.content) def getCoverArtRaw(self, cover_art_id: str) -> bytes: url = self.base_url + '/rest/getCoverArt?' payload = self._getBasePayload() payload['id'] = cover_art_id r = requests.get(url + self._encodePayload(payload)) return r.content def getSong(self, song_id: str) -> Song: url = self.base_url + '/rest/getSong?' payload = self._getBasePayload() payload['id'] = song_id response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response'] if response['status'] != 'ok': raise BadSubsonicResponse() return Song(response['song']) def star(self, id: str) -> None: url = self.base_url + '/rest/star?' payload = self._getBasePayload() payload['id'] = id response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response'] if response['status'] != 'ok': raise BadSubsonicResponse() def unstar(self, id: str) -> None: url = self.base_url + '/rest/unstar?' payload = self._getBasePayload() payload['id'] = id response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response'] if response['status'] != 'ok': raise BadSubsonicResponse() """ https://opensubsonic.netlify.app/docs/endpoints/scrobble/ id: str - unique identifier of the media to scrobble OPTIONAL time: int - posix time of when the song was listened to OPTIONAL submission: bool - Whether this is a "submission" or a "now playing" notification. Default: True """ def scrobble(self, id: str, time: int | None = None, submission: bool | None = True) -> None: url = self.base_url + '/rest/scrobble?' payload = self._getBasePayload() payload['id'] = id if time: payload['time'] = time payload['submission'] = submission response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response'] if response['status'] != 'ok': raise BadSubsonicResponse() """ https://opensubsonic.netlify.app/docs/endpoints/getrandomsongs/ """ def getRandomSongs(self, size: int | None = 10, genre: str | None = None, fromYear: str | int | None = None, toYear: str | int | None = None, musicFolderId: str | int | None = None ) -> list[Song]: url = self.base_url + '/rest/getRandomSongs?' payload = self._getBasePayload() # The default being 10 is only to make the method match the internal docs if size and size != 10: payload['size'] = size if genre: payload['genre'] = genre if fromYear: payload['fromYear'] = fromYear if toYear: payload['toYear'] = toYear if musicFolderId: payload['musicFolderId'] = musicFolderId response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response'] if response['status'] != 'ok': raise BadSubsonicResponse() random_songs: list[Song] = [] for song in response['randomSongs']['song']: random_songs.append(Song(song)) return random_songs def getUser(self, username: str) -> None: url = self.base_url + '/rest/getUser?' payload = self._getBasePayload() payload['username'] = username response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response'] return response def getNowPlaying(self) -> dict: url = self.base_url + "/rest/getNowPlaying?" payload = self._getBasePayload() response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response'] return response def getArtist(self, artist_id: str, raw_json: bool | None = False) -> dict: url = self.base_url + "/rest/getArtist?" payload = self._getBasePayload() payload['id'] = artist_id response = requests.get(url + self._encodePayload(payload)) if raw_json: response = response.json()['subsonic-response'] return response def getAlbum(self, album_id: str) -> dict: url = self.base_url + '/rest/getAlbum?' payload = self._getBasePayload() payload['id'] = album_id response = requests.get(url + self._encodePayload(payload)) return response.json()['subsonic-response']