oandapy ist ein Python-Wrapper für die REST-API von OANDA.
Pip verwenden:
$ pip install git+https://github.com/oanda/oandapy.git
oandapy ist auf Python-Anfragen angewiesen, die automatisch installiert werden.
Binden Sie das oandapy-Modul ein und erstellen Sie eine oandapy-Instanz mit Ihren Kontoanmeldeinformationen. Für FxGame und FxTrade muss ein Zugriffstoken bereitgestellt werden.
import oandapy
oanda = oandapy.API(environment="practice", access_token="abcdefghijk...")
Schlüsselwortargumente für Funktionen werden den Funktionen zugeordnet, die für jeden Endpunkt in den Oanda-API-Dokumenten verfügbar sind, sodass Änderungen an der API nicht von dieser Bibliothek daran gehindert werden, sie zu verwenden. Für jeden API-Aufruf gibt oandapy ein natives Python-Objekt zurück, das aus JSON konvertiert wurde, sodass Sie dies nicht tun müssen.
Die EndpointsMixin-Klasse in oandapy.py enthält einen Mixin aller Oanda-API-Endpunkte.
response = oanda.get_prices(instruments="EUR_USD")
prices = response.get("prices")
asking_price = prices[0].get("ask")
# required datetime functions
from datetime import datetime, timedelta
# sample account_id
account_id = 1813880
# set the trade to expire after one day
trade_expire = datetime.utcnow() + timedelta(days=1)
trade_expire = trade_expire.isoformat("T") + "Z"
response = oanda.create_order(account_id,
instrument="USD_CAD",
units=1000,
side='sell',
type='limit',
price=1.15,
expiry=trade_expire
)
Erstellen Sie eine benutzerdefinierte Streamer-Klasse, um festzulegen, wie Sie mit den Daten umgehen möchten. Jeder Tick wird über die Funktionen on_success
und on_error
gesendet. Da es sich bei diesen Methoden um abstrakte Methoden handelt, müssen Sie diese Methoden überschreiben, um die Streaming-Daten zu verarbeiten.
Das folgende Beispiel gibt die Anzahl der Ticks aus dem Stream aus und trennt dann die Verbindung.
class MyStreamer(oandapy.Streamer):
def __init__(self, count=10, *args, **kwargs):
super(MyStreamer, self).__init__(*args, **kwargs)
self.count = count
self.reccnt = 0
def on_success(self, data):
print data, "n"
self.reccnt += 1
if self.reccnt == self.count:
self.disconnect()
def on_error(self, data):
self.disconnect()
Initialisieren Sie eine Instanz Ihres benutzerdefinierten Streamers und stellen Sie eine Verbindung zum Stream her. Weitere Dokumentation finden Sie unter http://developer.oanda.com/rest-live/streaming/.
account = "12345"
stream = MyStreamer(environment="practice", access_token="abcdefghijk...")
stream.rates(account, instruments="EUR_USD,EUR_JPY,US30_USD,DE30_EUR")
Das gleiche Verfahren kann für Streaming-Events verwendet werden.
stream = MyStreamer(environment="practice", access_token="abcdefghijk...")
stream.events(ignore_heartbeat=False)