Итак, у меня есть трудоемкий вызов функции my_heavy_function
, и мне нужно перенаправить этот вывод в веб-интерфейс, который его вызывает, у меня есть метод для отправки сообщений в веб-интерфейс, давайте назовем этот метод async push_message_to_user()
.
в основном это что-то вроде
import time
def my_heavy_function():
time_on = 0
for i in range(20):
time.sleep(1)
print(f'time spend {time_on}')
time_on = time_on+1
async def push_message_to_user(message:str):
# some lib implementation
pass
if __name__ == "__main__":
my_heavy_function() # how push prints to the UI ?
возможно, есть способ дать my_heavy_function(stdout_obj)
и использовать этот std_object(StringIO) для выполнения чего-то вроде stdout_object.push(f'time spend {time_on}')
. Я могу это сделать, но я не могу изменить my_heavy_function()
на асинхронную версию, чтобы добавить push_message_to_user()
напрямую вместо print
(он используется другими неасинхронными подпрограммами)
то, что я хотел бы, это что-то вроде (псевдокод)
with contextlib.redirect_output(my_prints):
my_heavy_function()
while my_prints.readable():
# realtime push
await push_message_to_user(my_prints)
Спасибо!