oandapy 是 OANDA REST API 的 Python 包裝器。
使用點:
$ pip install git+https://github.com/oanda/oandapy.git
oandapy 依賴 python-requests,它將自動安裝。
包含 oandapy 模組並使用您的帳戶憑證建立 oandapy 實例。對於 FxGame 和 FxTrade,必須提供存取令牌。
import oandapy
oanda = oandapy.API(environment="practice", access_token="abcdefghijk...")
函數的關鍵字參數會對應到 Oanda API 文件中每個端點可用的函數,因此此函式庫不會阻止您使用 API 進行變更。對於每個 api 調用,oandapy 都會傳回一個從 JSON 轉換而來的本機 python 對象,因此您無需這樣做。
oandapy.py 中的 EndpointsMixin 類別包含所有 Oanda API 端點的 mixin。
response = oanda.get_prices(instruments="EUR_USD")
prices = response.get("prices")
asking_price = prices[0].get("ask")
# required datetime functions
from datetime import datetime, timedelta
# sample account_id
account_id = 1813880
# set the trade to expire after one day
trade_expire = datetime.utcnow() + timedelta(days=1)
trade_expire = trade_expire.isoformat("T") + "Z"
response = oanda.create_order(account_id,
instrument="USD_CAD",
units=1000,
side='sell',
type='limit',
price=1.15,
expiry=trade_expire
)
建立自訂串流媒體類別來設定您想要處理資料的方式。每個刻度都透過on_success
和on_error
函數發送。由於這些方法是抽象方法,因此您需要重寫這些方法來處理流程資料。
以下範例從流中列印計數刻度,然後斷開連接。
class MyStreamer(oandapy.Streamer):
def __init__(self, count=10, *args, **kwargs):
super(MyStreamer, self).__init__(*args, **kwargs)
self.count = count
self.reccnt = 0
def on_success(self, data):
print data, "n"
self.reccnt += 1
if self.reccnt == self.count:
self.disconnect()
def on_error(self, data):
self.disconnect()
初始化自訂流送器的實例,並開始連接到流。請參閱 http://developer.oanda.com/rest-live/streaming/ 以了解更多文件。
account = "12345"
stream = MyStreamer(environment="practice", access_token="abcdefghijk...")
stream.rates(account, instruments="EUR_USD,EUR_JPY,US30_USD,DE30_EUR")
相同的過程可用於流事件。
stream = MyStreamer(environment="practice", access_token="abcdefghijk...")
stream.events(ignore_heartbeat=False)