116 lines
4.1 KiB
Python
116 lines
4.1 KiB
Python
import os
|
|
import importlib
|
|
|
|
import nerimity
|
|
import pylast
|
|
import aiosqlite
|
|
|
|
from pyzxz.pyzxz.pyzxz import ZeroXZero # :sob:
|
|
|
|
from lastfmcollagegenerator.collage_generator import CollageGenerator
|
|
# from catbox_async_uploader.catbox_async_uploader.core import CatboxAsyncUploader
|
|
|
|
import utils as u
|
|
|
|
class Bot(nerimity.Client):
|
|
"""Extended client class for extra functionality."""
|
|
def __init__(self, lastfm_api_key: str, lastfm_api_secret: str,
|
|
owner_id: int = None, db_file: str = "database.db",
|
|
*args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
|
|
self.owner_id = owner_id
|
|
|
|
# initialize last.fm client
|
|
self.lastfm = pylast.LastFMNetwork(
|
|
api_key = lastfm_api_key,
|
|
api_secret = lastfm_api_secret
|
|
)
|
|
|
|
# initialize collage generator
|
|
self.collage_generator = CollageGenerator(
|
|
lastfm_api_key = lastfm_api_key,
|
|
lastfm_api_secret = lastfm_api_secret
|
|
)
|
|
|
|
# initialize catbox client
|
|
# self.catbox_uploader = CatboxAsyncUploader(userhash=catbox_hash)
|
|
|
|
# initialize 0x0 client
|
|
self.zxz = ZeroXZero("https://x0.at")
|
|
|
|
# database
|
|
self.db = None
|
|
self.db_file = db_file
|
|
|
|
def load_commands(self, commands_dir: str):
|
|
for filename in os.listdir(commands_dir):
|
|
if filename.endswith(".py") and not filename.startswith("_"):
|
|
module_name = f"{commands_dir}.{filename[:-3]}"
|
|
module = importlib.import_module(module_name)
|
|
if hasattr(module, "setup"):
|
|
module.setup(self)
|
|
print(f"Loaded {module_name}")
|
|
print("Registered commands:", self.commands)
|
|
|
|
def is_owner(self, user: nerimity.Member = None):
|
|
if not user: return None
|
|
else:
|
|
if user.id == self.owner_id: return True
|
|
else: return False
|
|
|
|
def _process_slash_commands(self, message: nerimity.Message):
|
|
if message.author.id == self.account.id: return
|
|
super()._process_slash_commands(message)
|
|
|
|
async def _process_commands(self, message: nerimity.Message):
|
|
if message.author.id == self.account.id: return
|
|
await super()._process_commands(message)
|
|
|
|
async def init_db(self):
|
|
print("Initializing database...")
|
|
self.db = await aiosqlite.connect(self.db_file)
|
|
await self.db.execute("""
|
|
CREATE TABLE IF NOT EXISTS lastfm_users (
|
|
user_id TEXT PRIMARY KEY,
|
|
lastfm_username TEXT NOT NULL
|
|
)
|
|
""")
|
|
await self.db.commit()
|
|
|
|
async def set_lastfm(self, user_id: str, username: str):
|
|
await self.db.execute(
|
|
"INSERT OR REPLACE INTO lastfm_users (user_id, lastfm_username) VALUES (?, ?)",
|
|
(user_id, username)
|
|
)
|
|
await self.db.commit()
|
|
|
|
async def get_lastfm(self, user_id: str):
|
|
async with self.db.execute(
|
|
"SELECT lastfm_username FROM lastfm_users WHERE user_id = ?",
|
|
(user_id,)
|
|
) as cursor:
|
|
row = await cursor.fetchone()
|
|
return row[0] if row else None
|
|
|
|
async def find_lastfm_username(self, ctx: nerimity.Context, username: str = None):
|
|
if not username:
|
|
try: username = await self.get_lastfm(ctx.author.id)
|
|
except Exception as e:
|
|
print(e)
|
|
await ctx.send(u.error_msg(f"Unknown database error:\n{e}"))
|
|
|
|
if not username:
|
|
await ctx.send(u.error_msg("Please provide a Last.fm username (or set yours with `/setfm`)."))
|
|
|
|
elif u.is_mention(username):
|
|
mentioned = self.get_user(username[3:-1])
|
|
try: username = await self.get_lastfm(mentioned.id)
|
|
except Exception as e:
|
|
print(e)
|
|
await ctx.send(u.error_msg(f"Unknown database error:\n{e}"))
|
|
|
|
if not username:
|
|
await ctx.send(u.error_msg(f"[@:{mentioned.id}] doesn't seem to have an account set. Do so with `/setfm`."))
|
|
|
|
return username
|