490 lines
15 KiB
Python
490 lines
15 KiB
Python
import time
|
|
import random
|
|
import string
|
|
import hashlib
|
|
import logging
|
|
import urllib.parse
|
|
|
|
import requests
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class Playlist:
|
|
id: str
|
|
name: str
|
|
songCount: int
|
|
duration: str
|
|
owner: str
|
|
coverArtId: str
|
|
|
|
def __init__(self, info: dict):
|
|
self.id = info['id']
|
|
self.name = info['name']
|
|
self.songCount = int(info['songCount'])
|
|
self.duration = time.strftime('%H:%M:%S', time.gmtime(int(info['duration'])))
|
|
self.owner = info['owner']
|
|
self.coverArtId = info['coverArt']
|
|
|
|
class Album:
|
|
id: str
|
|
title: str
|
|
name: str
|
|
album: str
|
|
coverArtId: str
|
|
duration: int
|
|
artistId: str
|
|
songCount: int
|
|
playCount: int
|
|
year: int | None
|
|
duration: str
|
|
|
|
def __init__(self, album_info: dict):
|
|
self.id = album_info['id']
|
|
self.title = album_info['title']
|
|
self.name = album_info['name']
|
|
self.album = album_info['album']
|
|
self.coverArtId = album_info['coverArt']
|
|
self.duration = int(album_info['duration'])
|
|
self.artistId = album_info['artistId']
|
|
self.songCount = int(album_info['songCount'])
|
|
self.playCount = int(album_info['playCount']) if 'playCount' in album_info else 0
|
|
self.year = int(album_info['year']) if 'year' in album_info else None
|
|
self.duration = self._secondToMinutes(int(album_info['duration']))
|
|
|
|
def _secondToMinutes(self, s: int) -> str:
|
|
return f'{s // 60}:{s % 60 if s % 60 > 9 else "0" + str(s % 60)}'
|
|
|
|
class Song:
|
|
id: str
|
|
title: str
|
|
album: str
|
|
artist: str
|
|
trackNumber: int
|
|
contentType: str
|
|
suffix: str
|
|
duration: int
|
|
bitRate: int
|
|
path: str
|
|
discNumber: int
|
|
albumId: str
|
|
artistId: str
|
|
comment: str
|
|
channelCount: int
|
|
samplingRate: int
|
|
playCount: int | None
|
|
coverArtId: str | None
|
|
|
|
def __init__(self, info: dict | None = {}):
|
|
self.id = info['id'] if 'id' in info else ""
|
|
self.title = info['title'] if 'title' in info else ""
|
|
self.album = info['album'] if 'album' in info else ""
|
|
self.artist = info['artist'] if 'artist' in info else ""
|
|
self.trackNumber = int(info['track']) if 'track' in info else ""
|
|
self.contentType = info['contentType'] if 'contentType' in info else ""
|
|
self.suffix = info['suffix'] if 'suffix' in info else ""
|
|
self.duration = int(info['duration']) if 'duration' in info else ""
|
|
self.bitRate = int(info['bitRate']) if 'bitRate' in info else ""
|
|
self.path = info['path'] if 'path' in info else ""
|
|
self.discNumber = int(info['discNumber']) if 'discNumber' in info else ""
|
|
self.albumId = info['albumId'] if 'albumId' in info else ""
|
|
self.artistId = info['artistId'] if 'artistId' in info else ""
|
|
self.comment = info['comment'] if 'comment' in info else ""
|
|
self.channelCount = int(info['channelCount']) if 'channelCount' in info else ""
|
|
self.samplingRate = int(info['samplingRate']) if 'samplingRate' in info else ""
|
|
self.playCount = int(info['playCount']) if 'playCount' in info else ""
|
|
self.coverArtId = info['coverArt'] if 'coverArt' in info else None
|
|
|
|
class Artist:
|
|
id: str
|
|
name: str
|
|
albumCount: int
|
|
coverArt: str
|
|
artistImageUrl: str
|
|
|
|
def __init__(self, id: str, name: str, albumCount: str | int, coverArt: str, artistImageUrl: str):
|
|
self.id = id
|
|
self.name = name
|
|
self.albumCount = int(albumCount)
|
|
self.coverArt = coverArt
|
|
self.artistImageUrl = artistImageUrl
|
|
|
|
def from_dict(info: dict):
|
|
return Artist(
|
|
info['id'] if 'id' in info else None,
|
|
info['name'] if 'name' in info else None,
|
|
info['albumCount'] if 'albumCount' in info else None,
|
|
info['coverArt'] if 'coverArt' in info else None,
|
|
info['artistImageUrl'] if 'artistImageUrl' in info else None
|
|
)
|
|
|
|
class UserNowPlaying:
|
|
id: str
|
|
title: str
|
|
album: str
|
|
artist: str
|
|
track: int
|
|
year: int
|
|
coverArt: str | None
|
|
duration: int
|
|
|
|
class BadSubsonicResponse(Exception):
|
|
pass
|
|
|
|
class SubpyConn:
|
|
username: str
|
|
password: str
|
|
base_url: str
|
|
salt: str
|
|
salt_len: int = 8
|
|
version: str = "1.16.1"
|
|
client_name: str = "testing"
|
|
|
|
def __init__(self, u: str, p: str, base_url: str, salt_len: int | None = 8):
|
|
self.username = u
|
|
self.password = p
|
|
self.base_url = base_url
|
|
|
|
self.salt_len = salt_len
|
|
|
|
def _saltGenerator(self) -> None:
|
|
self.salt = ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(self.salt_len))
|
|
|
|
def _hashPass(self) -> str:
|
|
self._saltGenerator()
|
|
return hashlib.md5((self.password + self.salt).encode('utf-8')).hexdigest()
|
|
|
|
def _getBasePayload(self) -> dict:
|
|
return {
|
|
"u" : self.username,
|
|
"t" : self._hashPass(),
|
|
"s" : self.salt,
|
|
"v" : self.version,
|
|
"c" : self.client_name,
|
|
"f": "json"
|
|
}
|
|
|
|
def _encodePayload(self, payload: dict) -> str:
|
|
return urllib.parse.urlencode(payload, quote_via=urllib.parse.quote_plus)
|
|
|
|
def _getMusicDirectory(self, id: str):
|
|
url = self.base_url + '/rest/getMusicDirectory?'
|
|
|
|
payload = self._getBasePayload()
|
|
payload['id'] = id
|
|
|
|
return requests.get(url + self._encodePayload(payload))
|
|
|
|
def getArtistAlbums(self, artist_id: str) -> list[Album]:
|
|
response: requests.Response = self._getMusicDirectory(artist_id).json()['subsonic-response']
|
|
|
|
if response['status'] != 'ok':
|
|
raise BadSubsonicResponse()
|
|
|
|
albums: list[Album] = []
|
|
for item in response['directory']['child']:
|
|
albums.append(Album(item))
|
|
|
|
return albums
|
|
|
|
def getAlbumSongs(self, album_id: str) -> list[Song]:
|
|
response: requests.Response = self._getMusicDirectory(album_id).json()['subsonic-response']
|
|
|
|
if response['status'] != 'ok':
|
|
raise BadSubsonicResponse()
|
|
|
|
songs: list[Song] = []
|
|
for item in response['directory']['child']:
|
|
songs.append(Song(item))
|
|
|
|
return songs
|
|
|
|
def getArtists(self) -> list[Artist]:
|
|
url = self.base_url + '/rest/getArtists?'
|
|
|
|
payload = self._getBasePayload()
|
|
|
|
response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response']
|
|
|
|
if response['status'] != 'ok':
|
|
raise BadSubsonicResponse()
|
|
|
|
artists: list[Artist] = []
|
|
|
|
for artist_index in response['artists']['index']:
|
|
for artist in artist_index['artist']:
|
|
artists.append(Artist.from_dict(artist))
|
|
|
|
return artists
|
|
|
|
def getAlbums(self, size: int | None = 25, offset: int | None = 0) -> list[Album]:
|
|
url = self.base_url + '/rest/getAlbumList?'
|
|
|
|
payload = self._getBasePayload()
|
|
payload['type'] = 'alphabeticalByName'
|
|
payload['size'] = size
|
|
payload['offset'] = offset
|
|
|
|
r = requests.get(url + self._encodePayload(payload))
|
|
json_r = r.json()
|
|
|
|
albums: list[Album] = []
|
|
|
|
if 'album' not in json_r['subsonic-response']['albumList']:
|
|
return []
|
|
|
|
for album in json_r['subsonic-response']['albumList']['album']:
|
|
albums.append(Album(album))
|
|
|
|
return albums
|
|
|
|
def getAllAlbums(self) -> list[Album]:
|
|
all_albums: list[Album] = []
|
|
|
|
offset = 0
|
|
while albums := self.getAlbums(50, offset):
|
|
all_albums.extend(albums)
|
|
offset += 50
|
|
|
|
return all_albums
|
|
|
|
def getPlaylists(self) -> list[Playlist]:
|
|
url = self.base_url + "/rest/getPlaylists?"
|
|
|
|
payload = self._getBasePayload()
|
|
|
|
r = requests.get(url + self._encodePayload(payload))
|
|
|
|
json_r = r.json()
|
|
|
|
playlists: list[Playlist] = []
|
|
for playlist in json_r['subsonic-response']['playlists']['playlist']:
|
|
playlists.append(Playlist(playlist))
|
|
|
|
return playlists
|
|
|
|
def getPlaylist(self, playlist_id: str) -> list[Song]:
|
|
url = self.base_url + "/rest/getPlaylist?"
|
|
|
|
payload = self._getBasePayload()
|
|
payload['id'] = playlist_id
|
|
|
|
r = requests.get(url + self._encodePayload(payload))
|
|
|
|
json_r = r.json()
|
|
|
|
songs: list[Song] = []
|
|
for song in json_r['subsonic-response']['playlist']['entry']:
|
|
songs.append(Song(song))
|
|
|
|
return songs
|
|
|
|
def getStarred(self) -> list[Album | Song | Artist]:
|
|
url = self.base_url + "/rest/getStarred?"
|
|
|
|
payload = self._getBasePayload()
|
|
|
|
r = requests.get(url + self._encodePayload(payload))
|
|
json_r = r.json()
|
|
|
|
starred: list[Album | Song | Artist] = []
|
|
for album in json_r['subsonic-response']['starred']['artist']:
|
|
starred.append(Album(album))
|
|
|
|
for artist in json_r['subsonic-response']['starred']['artist']:
|
|
starred.append(Artist.from_dict(artist))
|
|
|
|
for song in json_r['subsonic-response']['starred']['song']:
|
|
starred.append(Song(song))
|
|
|
|
return starred
|
|
|
|
def getStarredSongs(self) -> list[Song]:
|
|
url = self.base_url + "/rest/getStarred?"
|
|
|
|
payload = self._getBasePayload()
|
|
|
|
r = requests.get(url + self._encodePayload(payload))
|
|
json_r = r.json()
|
|
|
|
starred: list[Song] = []
|
|
for song in json_r['subsonic-response']['starred']['song']:
|
|
starred.append(Song(song))
|
|
|
|
return starred
|
|
|
|
# Returns a requests Response object, the data stream can be found in Response.content
|
|
def stream(self, song_id: str) -> requests.Response:
|
|
url = self.base_url + '/rest/stream?'
|
|
|
|
payload = self._getBasePayload()
|
|
|
|
payload['id'] = song_id
|
|
|
|
return requests.get(url + self._encodePayload(payload), stream=True)
|
|
|
|
# Saves cover art id
|
|
def getCoverArt(self, cover_art_id: str, path: str | None = None) -> None:
|
|
url = self.base_url + '/rest/getCoverArt?'
|
|
|
|
payload = self._getBasePayload()
|
|
payload['id'] = cover_art_id
|
|
|
|
r = requests.get(url + self._encodePayload(payload))
|
|
|
|
with open(path if path else 'cover.png', 'wb') as file:
|
|
file.write(r.content)
|
|
|
|
def getCoverArtRaw(self, cover_art_id: str) -> bytes:
|
|
url = self.base_url + '/rest/getCoverArt?'
|
|
|
|
payload = self._getBasePayload()
|
|
payload['id'] = cover_art_id
|
|
|
|
r = requests.get(url + self._encodePayload(payload))
|
|
|
|
return r.content
|
|
|
|
def getSong(self, song_id: str) -> Song:
|
|
url = self.base_url + '/rest/getSong?'
|
|
|
|
payload = self._getBasePayload()
|
|
payload['id'] = song_id
|
|
|
|
response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response']
|
|
|
|
if response['status'] != 'ok':
|
|
raise BadSubsonicResponse()
|
|
|
|
return Song(response['song'])
|
|
|
|
def star(self, id: str) -> None:
|
|
url = self.base_url + '/rest/star?'
|
|
|
|
payload = self._getBasePayload()
|
|
payload['id'] = id
|
|
|
|
response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response']
|
|
|
|
if response['status'] != 'ok':
|
|
raise BadSubsonicResponse()
|
|
|
|
def unstar(self, id: str) -> None:
|
|
url = self.base_url + '/rest/unstar?'
|
|
|
|
payload = self._getBasePayload()
|
|
payload['id'] = id
|
|
|
|
response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response']
|
|
|
|
if response['status'] != 'ok':
|
|
raise BadSubsonicResponse()
|
|
|
|
"""
|
|
https://opensubsonic.netlify.app/docs/endpoints/scrobble/
|
|
|
|
id: str - unique identifier of the media to scrobble
|
|
OPTIONAL time: int - posix time of when the song was listened to
|
|
OPTIONAL submission: bool - Whether this is a "submission" or a "now playing" notification. Default: True
|
|
"""
|
|
def scrobble(self, id: str, time: int | None = None, submission: bool | None = True) -> None:
|
|
url = self.base_url + '/rest/scrobble?'
|
|
|
|
payload = self._getBasePayload()
|
|
payload['id'] = id
|
|
|
|
if time:
|
|
payload['time'] = time
|
|
|
|
payload['submission'] = submission
|
|
|
|
response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response']
|
|
|
|
if response['status'] != 'ok':
|
|
raise BadSubsonicResponse()
|
|
|
|
"""
|
|
https://opensubsonic.netlify.app/docs/endpoints/getrandomsongs/
|
|
"""
|
|
def getRandomSongs(self,
|
|
size: int | None = 10,
|
|
genre: str | None = None,
|
|
fromYear: str | int | None = None,
|
|
toYear: str | int | None = None,
|
|
musicFolderId: str | int | None = None
|
|
) -> list[Song]:
|
|
url = self.base_url + '/rest/getRandomSongs?'
|
|
|
|
payload = self._getBasePayload()
|
|
|
|
# The default being 10 is only to make the method match the internal docs
|
|
if size and size != 10:
|
|
payload['size'] = size
|
|
|
|
if genre:
|
|
payload['genre'] = genre
|
|
|
|
if fromYear:
|
|
payload['fromYear'] = fromYear
|
|
|
|
if toYear:
|
|
payload['toYear'] = toYear
|
|
|
|
if musicFolderId:
|
|
payload['musicFolderId'] = musicFolderId
|
|
|
|
response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response']
|
|
|
|
if response['status'] != 'ok':
|
|
raise BadSubsonicResponse()
|
|
|
|
random_songs: list[Song] = []
|
|
|
|
for song in response['randomSongs']['song']:
|
|
random_songs.append(Song(song))
|
|
|
|
return random_songs
|
|
|
|
def getUser(self, username: str) -> None:
|
|
url = self.base_url + '/rest/getUser?'
|
|
|
|
payload = self._getBasePayload()
|
|
|
|
payload['username'] = username
|
|
|
|
response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response']
|
|
|
|
return response
|
|
|
|
def getNowPlaying(self) -> dict:
|
|
url = self.base_url + "/rest/getNowPlaying?"
|
|
|
|
payload = self._getBasePayload()
|
|
|
|
response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response']
|
|
|
|
return response
|
|
|
|
def getArtist(self, artist_id: str, raw_json: bool | None = False) -> dict:
|
|
url = self.base_url + "/rest/getArtist?"
|
|
|
|
payload = self._getBasePayload()
|
|
payload['id'] = artist_id
|
|
|
|
response = requests.get(url + self._encodePayload(payload))
|
|
|
|
if raw_json:
|
|
response = response.json()['subsonic-response']
|
|
|
|
return response
|
|
|
|
def getAlbum(self, album_id: str) -> dict:
|
|
url = self.base_url + '/rest/getAlbum?'
|
|
|
|
payload = self._getBasePayload()
|
|
payload['id'] = album_id
|
|
|
|
response = requests.get(url + self._encodePayload(payload))
|
|
|
|
return response.json()['subsonic-response']
|