Pythonによるイベント駆動型アーキテクチャの実装

Python の標準ライブラリだけで、同一プロセス内にイベントキューを実装し、複数のプロデューサ(マーケットデータ取得・オーダー取得・ファンダメンタルズ取得)と1つのコンシューマ(アルゴリズムトレーディング処理)を動かすサンプルです。

python

import threading import queue import time import random from enum import Enum, auto # ─── イベントの定義 ─────────────────────────────────────────────── class EventType(Enum): MARKET_DATA = auto() ORDER_UPDATE = auto() FUNDAMENTALS = auto() class Event: def __init__(self, type: EventType, payload: dict): self.type = type self.payload = payload # ─── イベントキュー ─────────────────────────────────────────────── class EventQueue: def __init__(self): self._q = queue.Queue() def enqueue(self, event: Event): """ プロデューサから呼ばれる """ self._q.put(event) def dequeue(self) -> Event: """ コンシューマが呼ぶ """ return self._q.get() def task_done(self): """ 処理完了を通知 """ self._q.task_done() # ─── 各種プロデューサ(擬似 API 呼び出し)──────────────────────── def market_data_producer(eq: EventQueue): while True: # ここを実際の Bybit API 呼び出しに置き換える fake_price = random.uniform(30000, 40000) ev = Event(EventType.MARKET_DATA, {'price': fake_price, 'timestamp': time.time()}) eq.enqueue(ev) time.sleep(1) # 1秒毎に取得 def order_update_producer(eq: EventQueue): while True: # ここを実際の注文状況 API 呼び出しに置き換える ev = Event(EventType.ORDER_UPDATE, {'order_id': random.randint(1000,2000), 'status': 'FILLED'}) eq.enqueue(ev) time.sleep(5) # 5秒毎に取得 def fundamentals_producer(eq: EventQueue): while True: # ここを実際のニュースサイト/AIエージェント API 呼び出しに置き換える ev = Event(EventType.FUNDAMENTALS, {'headline': 'News at ' + time.ctime(), 'sentiment': random.choice([-1,0,1])}) eq.enqueue(ev) time.sleep(10) # 10秒毎に取得 # ─── コンシューマ:アルゴリズムトレーディング処理 ──────────────────── def algo_trading_consumer(eq: EventQueue): while True: ev = eq.dequeue() if ev.type == EventType.MARKET_DATA: handle_market_data(ev.payload) elif ev.type == EventType.ORDER_UPDATE: handle_order_update(ev.payload) elif ev.type == EventType.FUNDAMENTALS: handle_fundamentals(ev.payload) eq.task_done() def handle_market_data(data): print(f"[Market] price={data['price']:.2f} at {time.ctime(data['timestamp'])}") def handle_order_update(data): print(f"[Order ] id={data['order_id']} status={data['status']}") def handle_fundamentals(data): print(f"[Fund ] {data['headline']} (sentiment={data['sentiment']})") # ─── エントリポイント ───────────────────────────────────────────── def main(): eq = EventQueue() # プロデューサスレッド起動 for producer in (market_data_producer, order_update_producer, fundamentals_producer): t = threading.Thread(target=producer, args=(eq,), daemon=True) t.start() # コンシューマスレッド起動 consumer_thread = threading.Thread(target=algo_trading_consumer, args=(eq,), daemon=True) consumer_thread.start() # メインは何もしないで待機 try: while True: time.sleep(1) except KeyboardInterrupt: print("Shutting down.") if __name__ == "__main__": main()


解説

  • Event/EventType

  • EventType でイベントの種類を列挙。

  • Event は種類とペイロード(辞書型)をひも付け。

  • EventQueue

  • queue.Queue をラップし、スレッドセーフなキューを提供。

  • enqueue()dequeue() を使ってイベントの投入と取り出しを行い、task_done() で完了を通知。

  • プロデューサ

  • それぞれ異なる API(Bybit、注文 API、ニュース/AI エージェント API など)を擬似的にポーリングし、イベントを生成してキューへ enqueue()

  • コンシューマ

  • dequeue() でキューからイベントを取り出し、種類毎にハンドラ関数を呼び分ける。

  • 同一プロセス内

  • 全て同一プロセスのスレッドで動かすため、プロセス間通信のオーバーヘッド無しに高速に動作します。 この構成をベースに、実際の API 呼び出しやハンドラのロジックを埋め込めば、図示のイベント駆動型アーキテクチャが Python で動くようになります。