From 5aaf6ede5d570582fa8a5ca7e26ab0d1dfcc60cc Mon Sep 17 00:00:00 2001 From: JISAUAY Date: Tue, 15 Apr 2025 16:35:13 -0500 Subject: [PATCH] init --- .gitignore | 3 + SubSonic.py | 479 +++++++++++++++++++++++++++++++++++++ main.py | 28 +++ music-history.db | Bin 0 -> 61440 bytes sql/get_album_id.sql | 3 + sql/get_artist_id.sql | 3 + sql/get_latest_song_id.sql | 4 + sql/get_song_id.sql | 4 + sql/get_user_id.sql | 3 + sql/insert_album.sql | 3 + sql/insert_artist.sql | 3 + sql/insert_song.sql | 4 + 12 files changed, 537 insertions(+) create mode 100644 .gitignore create mode 100644 SubSonic.py create mode 100644 main.py create mode 100644 music-history.db create mode 100644 sql/get_album_id.sql create mode 100644 sql/get_artist_id.sql create mode 100644 sql/get_latest_song_id.sql create mode 100644 sql/get_song_id.sql create mode 100644 sql/get_user_id.sql create mode 100644 sql/insert_album.sql create mode 100644 sql/insert_artist.sql create mode 100644 sql/insert_song.sql diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2a5f9ef --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.vscode +__pycache__ +config.ini \ No newline at end of file diff --git a/SubSonic.py b/SubSonic.py new file mode 100644 index 0000000..71cb2a5 --- /dev/null +++ b/SubSonic.py @@ -0,0 +1,479 @@ +import time +import random +import string +import hashlib +import logging +import urllib.parse + +import requests + + +logger = logging.getLogger(__name__) + +class Playlist: + id: str + name: str + songCount: int + duration: str + owner: str + coverArtId: str + + def __init__(self, info: dict): + self.id = info['id'] + self.name = info['name'] + self.songCount = int(info['songCount']) + self.duration = time.strftime('%H:%M:%S', time.gmtime(int(info['duration']))) + self.owner = info['owner'] + self.coverArtId = info['coverArt'] + +class Album: + id: str + title: str + name: str + album: str + coverArtId: str + duration: int + artistId: str + songCount: int + playCount: int + year: int | None + duration: str + + def __init__(self, album_info: dict): + self.id = album_info['id'] + self.title = album_info['title'] + self.name = album_info['name'] + self.album = album_info['album'] + self.coverArtId = album_info['coverArt'] + self.duration = int(album_info['duration']) + self.artistId = album_info['artistId'] + self.songCount = int(album_info['songCount']) + self.playCount = int(album_info['playCount']) if 'playCount' in album_info else 0 + self.year = int(album_info['year']) if 'year' in album_info else None + self.duration = self._secondToMinutes(int(album_info['duration'])) + + def _secondToMinutes(self, s: int) -> str: + return f'{s // 60}:{s % 60 if s % 60 > 9 else "0" + str(s % 60)}' + +class Song: + id: str + title: str + album: str + artist: str + trackNumber: int + contentType: str + suffix: str + duration: int + bitRate: int + path: str + discNumber: int + albumId: str + artistId: str + comment: str + channelCount: int + samplingRate: int + playCount: int | None + coverArtId: str | None + + def __init__(self, info: dict | None = {}): + self.id = info['id'] if 'id' in info else "" + self.title = info['title'] if 'title' in info else "" + self.album = info['album'] if 'album' in info else "" + self.artist = info['artist'] if 'artist' in info else "" + self.trackNumber = int(info['track']) if 'track' in info else "" + self.contentType = info['contentType'] if 'contentType' in info else "" + self.suffix = info['suffix'] if 'suffix' in info else "" + self.duration = int(info['duration']) if 'duration' in info else "" + self.bitRate = int(info['bitRate']) if 'bitRate' in info else "" + self.path = info['path'] if 'path' in info else "" + self.discNumber = int(info['discNumber']) if 'discNumber' in info else "" + self.albumId = info['albumId'] if 'albumId' in info else "" + self.artistId = info['artistId'] if 'artistId' in info else "" + self.comment = info['comment'] if 'comment' in info else "" + self.channelCount = int(info['channelCount']) if 'channelCount' in info else "" + self.samplingRate = int(info['samplingRate']) if 'samplingRate' in info else "" + self.playCount = int(info['playCount']) if 'playCount' in info else "" + self.coverArtId = info['coverArt'] if 'coverArt' in info else None + +class Artist: + id: str + name: str + albumCount: int + coverArt: str + artistImageUrl: str + + def __init__(self, id: str, name: str, albumCount: str | int, coverArt: str, artistImageUrl: str): + self.id = id + self.name = name + self.albumCount = int(albumCount) + self.coverArt = coverArt + self.artistImageUrl = artistImageUrl + + def from_dict(info: dict): + return Artist( + info['id'] if 'id' in info else None, + info['name'] if 'name' in info else None, + info['albumCount'] if 'albumCount' in info else None, + info['coverArt'] if 'coverArt' in info else None, + info['artistImageUrl'] if 'artistImageUrl' in info else None + ) + +class UserNowPlaying: + id: str + title: str + album: str + artist: str + track: int + year: int + coverArt: str | None + duration: int + +class BadSubsonicResponse(Exception): + pass + +class SubpyConn: + username: str + password: str + base_url: str + salt: str + salt_len: int = 8 + version: str = "1.16.1" + client_name: str = "testing" + + def __init__(self, u: str, p: str, base_url: str, salt_len: int | None = 8): + self.username = u + self.password = p + self.base_url = base_url + + self.salt_len = salt_len + + def _saltGenerator(self) -> None: + self.salt = ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(self.salt_len)) + + def _hashPass(self) -> str: + self._saltGenerator() + return hashlib.md5((self.password + self.salt).encode('utf-8')).hexdigest() + + def _getBasePayload(self) -> dict: + return { + "u" : self.username, + "t" : self._hashPass(), + "s" : self.salt, + "v" : self.version, + "c" : self.client_name, + "f": "json" + } + + def _encodePayload(self, payload: dict) -> str: + return urllib.parse.urlencode(payload, quote_via=urllib.parse.quote_plus) + + def _getMusicDirectory(self, id: str): + url = self.base_url + '/rest/getMusicDirectory?' + + payload = self._getBasePayload() + payload['id'] = id + + return requests.get(url + self._encodePayload(payload)) + + def getArtistAlbums(self, artist_id: str) -> list[Album]: + response: requests.Response = self._getMusicDirectory(artist_id).json()['subsonic-response'] + + if response['status'] != 'ok': + raise BadSubsonicResponse() + + albums: list[Album] = [] + for item in response['directory']['child']: + albums.append(Album(item)) + + return albums + + def getAlbumSongs(self, album_id: str) -> list[Song]: + response: requests.Response = self._getMusicDirectory(album_id).json()['subsonic-response'] + + if response['status'] != 'ok': + raise BadSubsonicResponse() + + songs: list[Song] = [] + for item in response['directory']['child']: + songs.append(Song(item)) + + return songs + + def getArtists(self) -> list[Artist]: + url = self.base_url + '/rest/getArtists?' + + payload = self._getBasePayload() + + response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response'] + + if response['status'] != 'ok': + raise BadSubsonicResponse() + + artists: list[Artist] = [] + + for artist_index in response['artists']['index']: + for artist in artist_index['artist']: + artists.append(Artist.from_dict(artist)) + + return artists + + def getAlbums(self, size: int | None = 25, offset: int | None = 0) -> list[Album]: + url = self.base_url + '/rest/getAlbumList?' + + payload = self._getBasePayload() + payload['type'] = 'alphabeticalByName' + payload['size'] = size + payload['offset'] = offset + + r = requests.get(url + self._encodePayload(payload)) + json_r = r.json() + + albums: list[Album] = [] + + if 'album' not in json_r['subsonic-response']['albumList']: + return [] + + for album in json_r['subsonic-response']['albumList']['album']: + albums.append(Album(album)) + + return albums + + def getAllAlbums(self) -> list[Album]: + all_albums: list[Album] = [] + + offset = 0 + while albums := self.getAlbums(50, offset): + all_albums.extend(albums) + offset += 50 + + return all_albums + + def getPlaylists(self) -> list[Playlist]: + url = self.base_url + "/rest/getPlaylists?" + + payload = self._getBasePayload() + + r = requests.get(url + self._encodePayload(payload)) + + json_r = r.json() + + playlists: list[Playlist] = [] + for playlist in json_r['subsonic-response']['playlists']['playlist']: + playlists.append(Playlist(playlist)) + + return playlists + + def getPlaylist(self, playlist_id: str) -> list[Song]: + url = self.base_url + "/rest/getPlaylist?" + + payload = self._getBasePayload() + payload['id'] = playlist_id + + r = requests.get(url + self._encodePayload(payload)) + + json_r = r.json() + + songs: list[Song] = [] + for song in json_r['subsonic-response']['playlist']['entry']: + songs.append(Song(song)) + + return songs + + def getStarred(self) -> list[Album | Song | Artist]: + url = self.base_url + "/rest/getStarred?" + + payload = self._getBasePayload() + + r = requests.get(url + self._encodePayload(payload)) + json_r = r.json() + + starred: list[Album | Song | Artist] = [] + for album in json_r['subsonic-response']['starred']['artist']: + starred.append(Album(album)) + + for artist in json_r['subsonic-response']['starred']['artist']: + starred.append(Artist.from_dict(artist)) + + for song in json_r['subsonic-response']['starred']['song']: + starred.append(Song(song)) + + return starred + + def getStarredSongs(self) -> list[Song]: + url = self.base_url + "/rest/getStarred?" + + payload = self._getBasePayload() + + r = requests.get(url + self._encodePayload(payload)) + json_r = r.json() + + starred: list[Song] = [] + for song in json_r['subsonic-response']['starred']['song']: + starred.append(Song(song)) + + return starred + + # Returns a requests Response object, the data stream can be found in Response.content + def stream(self, song_id: str) -> requests.Response: + url = self.base_url + '/rest/stream?' + + payload = self._getBasePayload() + + payload['id'] = song_id + + return requests.get(url + self._encodePayload(payload), stream=True) + + # Saves cover art id + def getCoverArt(self, cover_art_id: str, path: str | None = None) -> None: + url = self.base_url + '/rest/getCoverArt?' + + payload = self._getBasePayload() + payload['id'] = cover_art_id + + r = requests.get(url + self._encodePayload(payload)) + + with open(path if path else 'cover.png', 'wb') as file: + file.write(r.content) + + def getCoverArtRaw(self, cover_art_id: str) -> bytes: + url = self.base_url + '/rest/getCoverArt?' + + payload = self._getBasePayload() + payload['id'] = cover_art_id + + r = requests.get(url + self._encodePayload(payload)) + + return r.content + + def getSong(self, song_id: str) -> Song: + url = self.base_url + '/rest/getSong?' + + payload = self._getBasePayload() + payload['id'] = song_id + + response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response'] + + if response['status'] != 'ok': + raise BadSubsonicResponse() + + return Song(response['song']) + + def star(self, id: str) -> None: + url = self.base_url + '/rest/star?' + + payload = self._getBasePayload() + payload['id'] = id + + response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response'] + + if response['status'] != 'ok': + raise BadSubsonicResponse() + + def unstar(self, id: str) -> None: + url = self.base_url + '/rest/unstar?' + + payload = self._getBasePayload() + payload['id'] = id + + response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response'] + + if response['status'] != 'ok': + raise BadSubsonicResponse() + + """ + https://opensubsonic.netlify.app/docs/endpoints/scrobble/ + + id: str - unique identifier of the media to scrobble + OPTIONAL time: int - posix time of when the song was listened to + OPTIONAL submission: bool - Whether this is a "submission" or a "now playing" notification. Default: True + """ + def scrobble(self, id: str, time: int | None = None, submission: bool | None = True) -> None: + url = self.base_url + '/rest/scrobble?' + + payload = self._getBasePayload() + payload['id'] = id + + if time: + payload['time'] = time + + payload['submission'] = submission + + response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response'] + + if response['status'] != 'ok': + raise BadSubsonicResponse() + + """ + https://opensubsonic.netlify.app/docs/endpoints/getrandomsongs/ + """ + def getRandomSongs(self, + size: int | None = 10, + genre: str | None = None, + fromYear: str | int | None = None, + toYear: str | int | None = None, + musicFolderId: str | int | None = None + ) -> list[Song]: + url = self.base_url + '/rest/getRandomSongs?' + + payload = self._getBasePayload() + + # The default being 10 is only to make the method match the internal docs + if size and size != 10: + payload['size'] = size + + if genre: + payload['genre'] = genre + + if fromYear: + payload['fromYear'] = fromYear + + if toYear: + payload['toYear'] = toYear + + if musicFolderId: + payload['musicFolderId'] = musicFolderId + + response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response'] + + if response['status'] != 'ok': + raise BadSubsonicResponse() + + random_songs: list[Song] = [] + + for song in response['randomSongs']['song']: + random_songs.append(Song(song)) + + return random_songs + + def getUser(self, username: str) -> None: + url = self.base_url + '/rest/getUser?' + + payload = self._getBasePayload() + + payload['username'] = username + + response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response'] + + return response + + def getNowPlaying(self) -> dict: + url = self.base_url + "/rest/getNowPlaying?" + + payload = self._getBasePayload() + + response = requests.get(url + self._encodePayload(payload)).json()['subsonic-response'] + + return response + + def getArtist(self, artist_id: str, raw_json: bool | None = False) -> dict: + url = self.base_url + "/rest/getArtist?" + + payload = self._getBasePayload() + payload['id'] = artist_id + + response = requests.get(url + self._encodePayload(payload)) + + if raw_json: + response = response.json()['subsonic-response'] + + return response diff --git a/main.py b/main.py new file mode 100644 index 0000000..ef7211f --- /dev/null +++ b/main.py @@ -0,0 +1,28 @@ + +import time +import json +import configparser + +import SubSonic + +def main(): + config = configparser.ConfigParser() + config.read("config.ini") + + conn = SubSonic.SubpyConn( + config.get("Navidrome", "user"), + config.get("Navidrome", "pass"), + config.get("Navidrome", "url") + ) + + now_playing = conn.getNowPlaying() + + print(json.dumps(now_playing, indent=4)) + + artist_info = conn.getArtist(now_playing['nowPlaying']['entry'][0]['artistId'], raw_json=True) + + print(json.dumps(artist_info, indent=4)) + + +if __name__ == '__main__': + main() diff --git a/music-history.db b/music-history.db new file mode 100644 index 0000000000000000000000000000000000000000..2eb4ae9b11424ef4cfbb074762d8e1ca37283e60 GIT binary patch literal 61440 zcmeI)&2QUe9KdnA?UFbzbrB#_^?(%#wNY2vQgP!g8}1Y>OSdf1+EYkdGa}`si=8qd zacBpB$s}%^5C{GMu80%I9k_5AS58dgd7e1goR_XhLrBgys^1>J^!a^$kDtV{<-2!^ z4d1mt?{r&r-`>%dG+oy|v29J$QerU0upovxF}xMbi;4Q^gP9AaDec|8uQTaqS~~GW z%lwl1J^gF?aq{QX52$C>@i{sqTi6#@t#fB*sryaIvrGyNxPb5nn0 z`t`%6>pf^5)_k{hQd5hy_s0ri*UL`6;@Fk^wW4EZ$JS@iv7`Ike^+ydAX0ze>ZhON$b=$rdT9^wbEVQm4rq^lTuT9F> zAXQUW9@qUKhc9Lz>o;U-NAoX^w!2q$3O7pqs3f#Ja?UP0yH43DU3c!o$k&{H1B#NXM-Fimo-<$L>RMa)}@0wR+F*sO4Hv;o5t_OlBi> zna#I@&C_Qe#pBlUvVMNAkJCKtwY<6-Cq`&g;eRJ{)HxN|7xnsgJnKfx`gnONkE*oL zQPbpl;u0Hg_ct~cZS6bNJ-nl6UP1RTY%%#aVper?TBXN&(AxG`QC!q+NA~tPai|Zi zX@^pku5GJH{Psf3T3^=>R5(FP^4teKw|ykLdD*ZAi&sLqA6dC3NG#1Q0*~0R#|0009ILKmY**qAD=B{x|=OYJ|jv00IagfB*srAbf z^m!|xUcex)|8?>3KhOV1C-p>v00IagfB*srAbbUF|3@;t1cd+s2q1s}0tg_000IagfWS~d-v8(N zf5^cM0tg_000IagfB*srAb