import asyncio
from ten import AsyncExtension, AsyncTenEnv
class DefaultAsyncExtension(AsyncExtension):
async def on_configure(self, ten_env: AsyncTenEnv) -> None:
# 模拟异步操作,例如网络、文件 I/O。
await asyncio.sleep(0.5)
async def on_init(self, ten_env: AsyncTenEnv) -> None:
# 模拟异步操作,例如网络、文件 I/O。
await asyncio.sleep(0.5)
async def on_start(self, ten_env: AsyncTenEnv) -> None:
# 模拟异步操作,例如网络、文件 I/O。
await asyncio.sleep(0.5)
async def on_deinit(self, ten_env: AsyncTenEnv) -> None:
# 模拟异步操作,例如网络、文件 I/O。
await asyncio.sleep(0.5)
async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None:
cmd_json = cmd.to_json()
ten_env.log_debug(f"DefaultAsyncExtension on_cmd: {cmd_json}")
# 模拟异步操作,例如网络、文件 I/O。
await asyncio.sleep(0.5)
# 向其他扩展发送新命令并等待结果。结果将返回给原始发送者。
new_cmd = Cmd.create("hello")
cmd_result = await ten_env.send_cmd(new_cmd)
ten_env.return_result(cmd_result, cmd)
import asyncio
from ten import AsyncExtension, AsyncTenEnv
class DefaultAsyncExtension(AsyncExtension):
queue = asyncio.Queue()
loop:asyncio.AbstractEventLoop = None
async def on_start(self, ten_env: AsyncTenEnv) -> None:
self.loop = asyncio.get_event_loop()
self.loop.create_task(self._consume())
async def on_stop(self, ten_env: AsyncTenEnv) -> None:
self.queue.put(None)
async def _consume(self) -> None:
while True
try:
value = await self.queue.get()
if value is None:
self.ten_env.log_info("async loop exit")
break
# 用于处理从队列检索的值的代码。
except Exception as e:
self.ten_env.log_error(f"Failed to handle {e}")