如今各种语法糖越来越方便,以前很多难以直观理解的代码,通过语法糖包装下,变得越来越好用,典型的有 lambda表达式,协程, flow, rx 等等,用起来是不亦乐乎。
很对第三方sdk也逐渐从传统的callbck形式逐渐提供flow形式的接口,让适用方能够比较自然的接入。
比如调用一个方法调用房需要实时知道进度,假如有一个第三方模块提供一个接口如下,可以实时返回进度信息
def third_party_sdk(callback): # 模拟第三方SDK的生成 for i in range(10): callback(str(i))
那如果我们需要调用可以如下编写,非常简单,可以在拿到process时实时打印出当前进度
def process(): def print_process(current:str): print(current) third_party_sdk(print_process) if __name__ == "__main__": process()
但是这里有一层嵌套的内部函数代码,虽然可以提出去如果想要通过迭代器输出呢?比如我想通过迭代器输出是不是更清晰,比如
if __name__ == "__main__": for x in process(): print(x)
看起来很简单,但是实际上并不容易,因为不能修改调用的第三方函数,一旦调用第三方函数就是会阻塞,就没办法实时生成迭代器的data了。那怎么做呢?可以采用多线程+列队来实现,具体代码如下,希望对你有帮助。
import threading from queue import Queue def third_party_sdk(callback): # 模拟第三方SDK的生成 for i in range(10): callback(str(i)) def process(): q = Queue(maxsize=1) JOB_DONE = object() TIMEOUT = 30 msgs = [] def callback(msg): # todo 将message 通过yield返回给上层 # msgs.append(msg) q.put(msg) def task(): third_party_sdk(callback) q.put(JOB_DONE) t = threading.Thread(target=task) t.start() while True: # better set a timeout, or if task in sub-threading fails, it will result in a deadlock chunk = q.get(timeout=TIMEOUT) if chunk is JOB_DONE: break yield chunk t.join() if __name__ == "__main__": for x in process(): print(x)
虽然看起来多了很多代码,但是总归是能够实现的,再将这种封装抽成方法,那么再以后就能愉快的使用 process返回的迭代器来处理事件啦。