def assign_code_to_user(self, telegram_id, code_type, code_id, expires_in_hours=24): expires_at = datetime.now() + timedelta(hours=expires_in_hours) cursor = self.conn.cursor() cursor.execute(''' INSERT INTO user_requests (telegram_id, code_type, assigned_code_id, expires_at) VALUES (?, ?, ?, ?) ''', (telegram_id, code_type, code_id, expires_at)) self.conn.commit()
if query.data == 'get_xtream': await get_xtream_code(update, context) elif query.data == 'get_stbemu': await get_stbemu_code(update, context) elif query.data == 'my_codes': await show_my_codes(update, context) elif query.data == 'help': await show_help(update, context) async def get_xtream_code(update: Update, context: ContextTypes.DEFAULT_TYPE): """Assign Xtream Code to user""" user_id = str(update.effective_user.id)
# docker-compose.yml version: '3.8'
def is_allowed(self, user_id): now = datetime.now() user_requests = self.requests[user_id] # Clean old requests user_requests = [req for req in user_requests if now - req < timedelta(seconds=self.time_window)] if len(user_requests) >= self.max_requests: return False user_requests.append(now) self.requests[user_id] = user_requests return True rate_limiter = RateLimiter()