如今各种语法糖越来越方便,以前很多难以直观理解的代码,通过语法糖包装下,变得越来越好用,典型的有 lambda表达式,协程, flow, rx 等等,用起来是不亦乐乎。
很对第三方sdk也逐渐从传统的callbck形式逐渐提供flow形式的接口,让适用方能够比较自然的接入。
比如调用一个方法调用房需要实时知道进度,假如有一个第三方模块提供一个接口如下,可以实时返回进度信息
def third_party_sdk(callback):
# 模拟第三方SDK的生成
for i in range(10):
callback(str(i))
那如果我们需要调用可以如下编写,非常简单,可以在拿到process时实时打印出当前进度
def process():
def print_process(current:str):
print(current)
third_party_sdk(print_process)
if __name__ == "__main__":
process()
但是这里有一层嵌套的内部函数代码,虽然可以提出去如果想要通过迭代器输出呢?比如我想通过迭代器输出是不是更清晰,比如
if __name__ == "__main__":
for x in process():
print(x)
看起来很简单,但是实际上并不容易,因为不能修改调用的第三方函数,一旦调用第三方函数就是会阻塞,就没办法实时生成迭代器的data了。那怎么做呢?可以采用多线程+列队来实现,具体代码如下,希望对你有帮助。
import threading
from queue import Queue
def third_party_sdk(callback):
# 模拟第三方SDK的生成
for i in range(10):
callback(str(i))
def process():
q = Queue(maxsize=1)
JOB_DONE = object()
TIMEOUT = 30
msgs = []
def callback(msg):
# todo 将message 通过yield返回给上层
# msgs.append(msg)
q.put(msg)
def task():
third_party_sdk(callback)
q.put(JOB_DONE)
t = threading.Thread(target=task)
t.start()
while True:
# better set a timeout, or if task in sub-threading fails, it will result in a deadlock
chunk = q.get(timeout=TIMEOUT)
if chunk is JOB_DONE:
break
yield chunk
t.join()
if __name__ == "__main__":
for x in process():
print(x)
虽然看起来多了很多代码,但是总归是能够实现的,再将这种封装抽成方法,那么再以后就能愉快的使用 process返回的迭代器来处理事件啦。
