oandapy est un wrapper python pour l'API REST d'OANDA.
En utilisant pip :
$ pip install git+https://github.com/oanda/oandapy.git
oandapy dépend des requêtes python, qui seront installées automatiquement.
Incluez le module oandapy et créez une instance oandapy avec les informations d'identification de votre compte. Pour FxGame et FxTrade, un jeton d'accès doit être fourni.
import oandapy
oanda = oandapy.API(environment="practice", access_token="abcdefghijk...")
Les arguments de mots clés des fonctions sont mappés aux fonctions disponibles pour chaque point de terminaison dans la documentation de l'API Oanda, de sorte que les modifications apportées à l'API ne vous empêchent pas de les utiliser par cette bibliothèque. Pour chaque appel d'API, oandapy renvoie un objet python natif, converti à partir de JSON pour que vous n'ayez pas à le faire.
La classe EndpointsMixin dans oandapy.py contient un mix de tous les points de terminaison de l'API Oanda.
response = oanda.get_prices(instruments="EUR_USD")
prices = response.get("prices")
asking_price = prices[0].get("ask")
# required datetime functions
from datetime import datetime, timedelta
# sample account_id
account_id = 1813880
# set the trade to expire after one day
trade_expire = datetime.utcnow() + timedelta(days=1)
trade_expire = trade_expire.isoformat("T") + "Z"
response = oanda.create_order(account_id,
instrument="USD_CAD",
units=1000,
side='sell',
type='limit',
price=1.15,
expiry=trade_expire
)
Créez une classe de streamer personnalisée pour configurer la manière dont vous souhaitez gérer les données. Chaque tick est envoyé via les fonctions on_success
et on_error
. Étant donné que ces méthodes sont des méthodes abstraites, vous devez les remplacer pour gérer les données en streaming.
L'exemple suivant imprime le nombre de ticks du flux, puis se déconnecte.
class MyStreamer(oandapy.Streamer):
def __init__(self, count=10, *args, **kwargs):
super(MyStreamer, self).__init__(*args, **kwargs)
self.count = count
self.reccnt = 0
def on_success(self, data):
print data, "n"
self.reccnt += 1
if self.reccnt == self.count:
self.disconnect()
def on_error(self, data):
self.disconnect()
Initialisez une instance de votre streamer personnalisé et commencez à vous connecter au flux. Voir http://developer.oanda.com/rest-live/streaming/ pour une documentation supplémentaire.
account = "12345"
stream = MyStreamer(environment="practice", access_token="abcdefghijk...")
stream.rates(account, instruments="EUR_USD,EUR_JPY,US30_USD,DE30_EUR")
La même procédure peut être utilisée pour les événements en streaming.
stream = MyStreamer(environment="practice", access_token="abcdefghijk...")
stream.events(ignore_heartbeat=False)