Compare commits
2 commits
5c0cec74d0
...
26559dd270
| Author | SHA1 | Date | |
|---|---|---|---|
| 26559dd270 | |||
| d751c5ab45 |
1 changed files with 41 additions and 41 deletions
|
|
@ -1,11 +1,10 @@
|
|||
import requests
|
||||
import aiohttp
|
||||
from pathlib import Path
|
||||
from typing import Union
|
||||
|
||||
|
||||
class ZeroXZero:
|
||||
"""
|
||||
A static utility class for interacting with the 0x0.st file hosting service.
|
||||
An async utility class for interacting with the 0x0.st file hosting service.
|
||||
|
||||
Features:
|
||||
- Upload files from disk
|
||||
|
|
@ -14,54 +13,55 @@ class ZeroXZero:
|
|||
- Check service availability
|
||||
"""
|
||||
|
||||
ENDPOINT_URL = "https://0x0.st"
|
||||
HEADERS = {
|
||||
"User-Agent": "pyzxz-uploader/1.0 (https://github.com/yourrepo)"
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def upload(file_path: Union[str, Path]) -> str:
|
||||
def __init__(self, endpoint_url: str = "https://0x0.st"):
|
||||
self.endpoint_url = endpoint_url
|
||||
|
||||
async def upload(self, file_path: Union[str, Path]) -> str:
|
||||
file_path = Path(file_path)
|
||||
|
||||
if not file_path.exists():
|
||||
raise FileNotFoundError(f"No such file: {file_path}")
|
||||
|
||||
with file_path.open("rb") as f:
|
||||
response = requests.post(
|
||||
ZeroXZero.ENDPOINT_URL,
|
||||
files={"file": f},
|
||||
headers=ZeroXZero.HEADERS
|
||||
)
|
||||
async with aiohttp.ClientSession(headers=self.HEADERS) as session:
|
||||
with file_path.open("rb") as f:
|
||||
data = aiohttp.FormData()
|
||||
data.add_field("file", f, filename=file_path.name)
|
||||
async with session.post(
|
||||
self.endpoint_url,
|
||||
data=data
|
||||
) as response:
|
||||
text = await response.text()
|
||||
if response.status == 200 and text.startswith("https://"):
|
||||
return text.strip()
|
||||
raise ValueError(f"Upload failed: {text.strip()}")
|
||||
|
||||
if response.ok and response.text.startswith("https://"):
|
||||
return response.text.strip()
|
||||
raise ValueError(f"Upload failed: {response.text.strip()}")
|
||||
async def upload_from_bytes(self, data: bytes, filename: str) -> str:
|
||||
async with aiohttp.ClientSession(headers=self.HEADERS) as session:
|
||||
form = aiohttp.FormData()
|
||||
form.add_field("file", data, filename=filename)
|
||||
async with session.post(
|
||||
self.endpoint_url,
|
||||
data=form
|
||||
) as response:
|
||||
text = await response.text()
|
||||
if response.status == 200 and text.startswith("https://"):
|
||||
return text.strip()
|
||||
raise ValueError(f"Upload failed: {text.strip()}")
|
||||
|
||||
@staticmethod
|
||||
def upload_from_bytes(data: bytes, filename: str) -> str:
|
||||
files = {"file": (filename, data)}
|
||||
response = requests.post(
|
||||
ZeroXZero.ENDPOINT_URL,
|
||||
files=files,
|
||||
headers=ZeroXZero.HEADERS
|
||||
)
|
||||
async def upload_text(self, text: str, filename: str = "text.txt") -> str:
|
||||
return await self.upload_from_bytes(text.encode("utf-8"), filename)
|
||||
|
||||
if response.ok and response.text.startswith("https://"):
|
||||
return response.text.strip()
|
||||
raise ValueError(f"Upload failed: {response.text.strip()}")
|
||||
|
||||
@staticmethod
|
||||
def upload_text(text: str, filename: str = "text.txt") -> str:
|
||||
return ZeroXZero.upload_from_bytes(text.encode("utf-8"), filename)
|
||||
|
||||
@staticmethod
|
||||
def is_available() -> bool:
|
||||
async def is_available(self) -> bool:
|
||||
try:
|
||||
response = requests.head(
|
||||
ZeroXZero.ENDPOINT_URL,
|
||||
timeout=3,
|
||||
headers=ZeroXZero.HEADERS
|
||||
)
|
||||
return response.status_code == 200
|
||||
except requests.RequestException:
|
||||
return False
|
||||
async with aiohttp.ClientSession(headers=self.HEADERS) as session:
|
||||
async with session.head(
|
||||
self.endpoint_url,
|
||||
timeout=aiohttp.ClientTimeout(total=3)
|
||||
) as response:
|
||||
return response.status == 200
|
||||
except Exception:
|
||||
return False
|
||||
Loading…
Add table
Reference in a new issue