Eagleget For Linux -
def get_data(self): return (self.url_input.text(), self.path_input.text(), self.threads_spin.value()) src/browser_integration.py
def format_size(self, size): for unit in ['B', 'KB', 'MB', 'GB']: if size < 1024.0: return f"size:.1f unit" size /= 1024.0 return f"size:.1f TB"
def save_task(self, task: DownloadTask): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO downloads (id, url, filename, save_path, total_size, downloaded_size, status, threads, speed, created_at, completed_at, md5) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( task.id, task.url, task.filename, task.save_path, task.total_size, task.downloaded_size, task.status.value, task.threads, task.speed, task.created_at.isoformat(), task.completed_at.isoformat() if task.completed_at else None, task.md5 )) conn.commit() conn.close()
class DownloadThread(threading.Thread): def (self, task: DownloadTask): super(). init () self.task = task self.chunks: List[DownloadChunk] = [] self.paused = False self.stopped = False self.lock = threading.Lock() self.start_time = None self.last_update = None self.last_downloaded = 0 eagleget for linux
def resume_download(self, task_id: str): if task_id in self.tasks: self.start_download(task_id)
def delete_task(self, task_id: str): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute('DELETE FROM downloads WHERE id = ?', (task_id,)) conn.commit() conn.close()
def stop(self): self.stopped = True
def columnCount(self, parent=QModelIndex()): return len(self.headers)
import socket import json import threading from http.server import HTTPServer, BaseHTTPRequestHandler class DownloadHandler(BaseHTTPRequestHandler): def do_POST(self): if self.path == '/download': content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) data = json.loads(post_data.decode('utf-8'))
def download_chunk(self, chunk: DownloadChunk): filepath = os.path.join(self.task.save_path, self.task.filename) temp_filepath = filepath + '.eagleget' # Check existing data if os.path.exists(temp_filepath): with open(temp_filepath, 'rb+') as f: f.seek(chunk.start) chunk.downloaded = f.tell() - chunk.start headers = {} if chunk.downloaded > 0: headers['Range'] = f'bytes=chunk.start + chunk.downloaded-chunk.end' else: headers['Range'] = f'bytes=chunk.start-chunk.end' try: response = requests.get(self.task.url, headers=headers, stream=True) with open(temp_filepath, 'r+b') as f: f.seek(chunk.start + chunk.downloaded) for data in response.iter_content(chunk_size=8192): if self.paused or self.stopped: break if data: f.write(data) with self.lock: chunk.downloaded += len(data) except Exception as e: print(f"Chunk chunk.thread_id failed: e") def get_data(self): return (self
class DownloadManager: def (self, db_path: str = "downloads.db"): self.db_path = db_path self.tasks: Dict[str, DownloadTask] = {} self.active_downloads: Dict[str, 'DownloadThread'] = {} self.queue = [] self.init_database() self.load_tasks()
sys.exit(app.exec_()) if == ' main ': main() Desktop Integration eagleget.desktop
def remove_download(self, task_id: str, delete_file: bool = False): if task_id in self.active_downloads: self.active_downloads[task_id].stop() del self.active_downloads[task_id] if delete_file: task = self.tasks[task_id] filepath = os.path.join(task.save_path, task.filename) if os.path.exists(filepath): os.remove(filepath) del self.tasks[task_id] self.delete_task(task_id) def get_data(self): return (self.url_input.text()
requests==2.28.2 PyQt5==5.15.9 pyperclip==1.8.2 python-magic==0.4.27 src/main.py
