oandapy 是 OANDA REST API 的 Python 包装器。
使用点:
$ pip install git+https://github.com/oanda/oandapy.git
oandapy 依赖于 python-requests,它将自动安装。
包含 oandapy 模块并使用您的帐户凭据创建 oandapy 实例。对于 FxGame 和 FxTrade,必须提供访问令牌。
import oandapy
oanda = oandapy.API(environment="practice", access_token="abcdefghijk...")
函数的关键字参数映射到 Oanda API 文档中每个端点可用的函数,因此此库不会阻止您使用 API 进行更改。对于每个 api 调用,oandapy 都会返回一个从 JSON 转换而来的本机 python 对象,因此您无需这样做。
oandapy.py 中的 EndpointsMixin 类包含所有 Oanda API 端点的 mixin。
response = oanda.get_prices(instruments="EUR_USD")
prices = response.get("prices")
asking_price = prices[0].get("ask")
# required datetime functions
from datetime import datetime, timedelta
# sample account_id
account_id = 1813880
# set the trade to expire after one day
trade_expire = datetime.utcnow() + timedelta(days=1)
trade_expire = trade_expire.isoformat("T") + "Z"
response = oanda.create_order(account_id,
instrument="USD_CAD",
units=1000,
side='sell',
type='limit',
price=1.15,
expiry=trade_expire
)
创建自定义流媒体类来设置您想要处理数据的方式。每个刻度都通过on_success
和on_error
函数发送。由于这些方法是抽象方法,因此您需要重写这些方法来处理流数据。
以下示例从流中打印计数刻度,然后断开连接。
class MyStreamer(oandapy.Streamer):
def __init__(self, count=10, *args, **kwargs):
super(MyStreamer, self).__init__(*args, **kwargs)
self.count = count
self.reccnt = 0
def on_success(self, data):
print data, "n"
self.reccnt += 1
if self.reccnt == self.count:
self.disconnect()
def on_error(self, data):
self.disconnect()
初始化自定义流送器的实例,并开始连接到流。请参阅 http://developer.oanda.com/rest-live/streaming/ 了解更多文档。
account = "12345"
stream = MyStreamer(environment="practice", access_token="abcdefghijk...")
stream.rates(account, instruments="EUR_USD,EUR_JPY,US30_USD,DE30_EUR")
相同的过程可用于流事件。
stream = MyStreamer(environment="practice", access_token="abcdefghijk...")
stream.events(ignore_heartbeat=False)