Il s'agit de Pypacker : la bibliothèque de manipulation de paquets la plus rapide et la plus simple pour Python. Il vous permet de créer des paquets manuellement en définissant chaque aspect de toutes les données d'en-tête, de disséquer les paquets en analysant les octets bruts des paquets, d'envoyer/recevoir des paquets sur différentes couches et d'intercepter des paquets.
Créez des paquets donnant des valeurs spécifiques ou prenez les valeurs par défaut :
from pypacker . layer3 . ip import IP
from pypacker . layer3 . icmp import ICMP
ip = IP ( src_s = "127.0.0.1" , dst_s = "192.168.0.1" , p = 1 ) +
ICMP ( type = 8 ) +
ICMP . Echo ( id = 123 , seq = 1 , body_bytes = b"foobar" )
# output packet
print ( "%s" % ip )
IP ( v_hl = 45 , tos = 0 , len = 2 A , id = 0 , off = 0 , ttl = 40 , p = 1 , sum = 3 B29 , src = b' x7f x00 x00 x01 ' , dst = b' xc0 xa8 x00 x01 ' , opts = [], handler = icmp )
ICMP ( type = 8 , code = 0 , sum = C03F , handler = echo )
Echo ( id = 7 B , seq = 1 , ts = 0 , bytes = b'foobar' )
Lisez les paquets du fichier (format pcap/tcpdump), analysez-les et réécrivez-les :
from pypacker import ppcap
from pypacker . layer12 import ethernet
from pypacker . layer3 import ip
from pypacker . layer4 import tcp
preader = ppcap . Reader ( filename = "packets_ether.pcap" )
pwriter = ppcap . Writer ( filename = "packets_ether_new.pcap" , linktype = ppcap . DLT_EN10MB )
for ts , buf in preader :
eth = ethernet . Ethernet ( buf )
if eth [ ethernet . Ethernet , ip . IP , tcp . TCP ] is not None :
print ( "%d: %s:%s -> %s:%s" % ( ts , eth [ ip . IP ]. src_s , eth [ tcp . TCP ]. sport ,
eth [ ip . IP ]. dst_s , eth [ tcp . TCP ]. dport ))
pwriter . write ( eth . bin ())
pwriter . close ()
Intercepter (et modifier) les paquets, par exemple pour MITM :
# Add iptables rule:
# iptables -I INPUT 1 -p icmp -j NFQUEUE --queue-balance 0:2
import time
from pypacker import interceptor
from pypacker . layer3 import ip , icmp
# ICMP Echo request intercepting
def verdict_cb ( ll_data , ll_proto_id , data , ctx ):
ip1 = ip . IP ( data )
icmp1 = ip1 [ icmp . ICMP ]
if icmp1 is None or icmp1 . type != icmp . ICMP_TYPE_ECHO_REQ :
return data , interceptor . NF_ACCEPT
echo1 = icmp1 [ icmp . ICMP . Echo ]
if echo1 is None :
return data , interceptor . NF_ACCEPT
pp_bts = b"PYPACKER"
print ( "changing ICMP echo request packet" )
echo1 . body_bytes = echo1 . body_bytes [: - len ( pp_bts )] + pp_bts
return ip1 . bin (), interceptor . NF_ACCEPT
ictor = interceptor . Interceptor ()
ictor . start ( verdict_cb , queue_ids = [ 0 , 1 , 2 ])
print ( "now sind a ICMP echo request to localhost: ping 127.0.0.1" )
time . sleep ( 999 )
ictor . stop ()
Envoyez et recevez des paquets :
# send/receive raw bytes
from pypacker import psocket
from pypacker . layer12 import ethernet
from pypacker . layer3 import ip
psock = psocket . SocketHndl ( mode = psocket . SocketHndl . MODE_LAYER_2 , timeout = 10 )
for raw_bytes in psock :
eth = ethernet . Ethernet ( raw_bytes )
print ( "Got packet: %r" % eth )
eth . reverse_address ()
eth . ip . reverse_address ()
psock . send ( eth . bin ())
# stop on first packet
break
psock . close ()
# send/receive using filter
from pypacker import psocket
from pypacker . layer3 import ip
from pypacker . layer4 import tcp
packet_ip = ip . IP ( src_s = "127.0.0.1" , dst_s = "127.0.0.1" ) + tcp . TCP ( dport = 80 )
psock = psocket . SocketHndl ( mode = psocket . SocketHndl . MODE_LAYER_3 , timeout = 10 )
def filter_pkt ( pkt ):
return pkt . ip . tcp . sport == 80
psock . send ( packet_ip . bin (), dst = packet_ip . dst_s )
pkts = psock . recvp ( filter_match_recv = filter_pkt )
for pkt in pkts :
print ( "got answer: %r" % pkt )
psock . close ()
# Send/receive based on source/destination data
from pypacker import psocket
from pypacker . layer3 import ip
from pypacker . layer4 import tcp
packet_ip = ip . IP ( src_s = "127.0.0.1" , dst_s = "127.0.0.1" ) + tcp . TCP ( dport = 80 )
psock = psocket . SocketHndl ( mode = psocket . SocketHndl . MODE_LAYER_3 , timeout = 10 )
packets = psock . sr ( packet_ip , max_packets_recv = 1 )
for p in packets :
print ( "got layer 3 packet: %s" % p )
psock . close ()
Quelques exemples :
Voir examples/ et tests/test_pypacker.py.
Les tests sont exécutés comme suit :
Résultats des tests de performances : pypacker
orC = Intel Core2 Duo CPU @ 1,866 GHz, 2GB RAM, CPython v3.6
orP = Intel Core2 Duo CPU @ 1,866 GHz, 2GB RAM, Pypy 5.10.1
rounds per test: 10000
=====================================
>>> parsing (IP + ICMP)
orC = 86064 p/s
orP = 208346 p/s
>>> creating/direct assigning (IP only header)
orC = 41623 p/s
orP = 59370 p/s
>>> bin() without change (IP)
orC = 170356 p/s
orP = 292133 p/s
>>> output with change/checksum recalculation (IP)
orC = 10104 p/s
orP = 23851 p/s
>>> basic/first layer parsing (Ethernet + IP + TCP + HTTP)
orC = 62748 p/s
orP = 241047 p/s
>>> changing Triggerlist element value (Ethernet + IP + TCP + HTTP)
orC = 101552 p/s
orP = 201994 p/s
>>> changing Triggerlist/text based proto (Ethernet + IP + TCP + HTTP)
orC = 37249 p/s
orP = 272972 p/s
>>> direct assigning and concatination (Ethernet + IP + TCP + HTTP)
orC = 7428 p/s
orP = 14315 p/s
>>> full packet parsing (Ethernet + IP + TCP + HTTP)
orC = 6886 p/s
orP = 17040 p/s
Résultats des tests de performances : pypacker contre dpkt contre scapy
Comparing pypacker, dpkt and scapy performance (parsing Ethernet + IP + TCP + HTTP)
orC = Intel Core2 Duo CPU @ 1,866 GHz, 2GB RAM, CPython v3.6
orC2 = Intel Core2 Duo CPU @ 1,866 GHz, 2GB RAM, CPython v2.7
rounds per test: 10000
=====================================
>>> testing pypacker parsing speed
orC = 17938 p/s
>>> testing dpkt parsing speed
orC = 12431 p/s
>>> testing scapy parsing speed
orC2 = 726 p/s
Q : Par où dois-je commencer pour apprendre à utiliser Pypacker ?
R : Si vous connaissez déjà Scapy, commencer par lire les exemples devrait être OK. Sinon, la documentation contient une introduction générale à pypacker qui montre l'utilisation et les concepts de pypacker.
Q : Quelle est la vitesse de pypacker ?
R : Voir résultats ci-dessus. Pour des résultats détaillés sur votre machine, exécutez des tests.
Q : Y a-t-il de la documentation ?
R : Pypacker est basé sur le code de dpkt, qui à son tour n'avait aucune documentation officielle et très peu de documentation interne sur le code. Cela rendait délicate la compréhension du comportement interne. Après tout, la documentation du code a été largement étendue pour Pypacker. La documentation peut être trouvée dans ces répertoires et fichiers :
Les protocoles eux-mêmes (voir layerXYZ) n'ont généralement pas beaucoup de documentation car ils sont documentés par leurs RFC/normes officielles respectives.
Q : Quels protocoles sont pris en charge ?
R : Les protocoles actuellement pris en charge sont : Ethernet, Radiotap, IEEE80211, ARP, DNS, STP, PPP, OSPF, VRRP, DTP, IP, ICMP, PIM, IGMP, IPX, TCP, UDP, SCTP, HTTP, NTP, RTP, DHCP, RIP, SIP, Telnet, HSRP, diamètre, SSL, TPKT, Pmap, Radius, BGP
Q : Comment les protocoles sont-ils ajoutés ?
A : Réponse courte : étendez la classe Packet et ajoutez la variable de classe __hdr__
pour définir les champs d'en-tête. Réponse longue : voir examples/examples_new_protocol.py pour un exemple très complet.
Q : Comment puis-je contribuer à ce projet ?
R : Veuillez utiliser le outil de suivi des bogues Github pour les demandes de bogues/fonctionnalités. Veuillez lire le bugtracker pour les bogues déjà connus avant d'en déposer un nouveau. Les correctifs peuvent être envoyés via une pull request.
Q : Sous quelle licence Pypacker est délivré ?
R : Il s'agit de la Licence GPLv2 (voir le fichier LICENSE pour plus d'informations).
Q : Est-il prévu de prendre en charge le [protocole xyz] ?
R : La prise en charge de protocoles particuliers est ajoutée à Pypacker grâce à la contribution de personnes à ce support - il n'existe aucun plan formel pour ajouter la prise en charge de protocoles particuliers dans des versions futures particulières.
Q : Il y a un problème xyz avec Pypacker sous Windows 3.11/XP/7/8/mobile etc. Pouvez-vous résoudre ce problème ?
R : Les fonctionnalités de base devraient fonctionner avec n’importe quel système d’exploitation. Les options optionnelles peuvent causer des problèmes (par exemple, intercepteur) et il n'y aura pas de support pour cela. Pourquoi? Parce que la qualité compte et je ne soutiendrai pas les systèmes de qualité inférieure. Réfléchissez à deux fois avant de choisir un système d'exploitation et faites face aux conséquences ; ne blâmez pas les autres pour votre décision. Alternativement : donnez-moi une compensation monétaire et je verrai ce que je peux faire (;
# This will lazy parse only needed layers behind the scenes
if ether.src == "...":
...
elif ip.src == "...":
...
elif tcp.sport == "...":
...
pkt = Ethernet() + IP() + TCP()
# This parses ALL layers
packet_print = "%s" % pkt
packet_found = pkt[Telnet]
# Alternative: Use multi-value index-notation. This will stop parsing at any non-matching layer:
packet_found = pkt[Ethernet,IP,TCP,Telnet]
pkt = ip.IP(src_s="1.2.3.4", dst_s="1.2.3.5") + tcp.TCP()
# Disable checksum calculation (and any other update) for IP and TCP (only THIS packet instance)
pkt.sum_au_active = False
pkt.tcp.sum_au_active = False
bts = pkt.bin(update_auto_fields=False)
sysctl -w net.core.rmem_max=12582912
sysctl -w net.core.rmem_default=12582912
sysctl -w net.core.wmem_max=12582912
sysctl -w net.core.wmem_default=12582912
sysctl -w net.core.optmem_max=2048000
sysctl -w net.core.netdev_max_backlog=5000
sysctl -w net.unix.max_dgram_qlen=1000
sysctl -w net.ipv4.tcp_rmem="10240 87380 12582912"
sysctl -w net.ipv4.tcp_wmem="10240 87380 12582912"
sysctl -w net.ipv4.tcp_mem="21228 87380 12582912"
sysctl -w net.ipv4.udp_mem="21228 87380 12582912"
sysctl -w net.ipv4.tcp_window_scaling=1
sysctl -w net.ipv4.tcp_timestamps=1
sysctl -w net.ipv4.tcp_sack=1