هذا هو Pypacker: أسرع وأبسط ليب لمعالجة الحزم في لغة بايثون. فهو يتيح لك إنشاء حزم يدويًا عن طريق تحديد كل جانب من جوانب جميع بيانات الرأس، وتشريح الحزم عن طريق تحليل بايتات الحزمة الأولية، وإرسال/استقبال الحزم على طبقات مختلفة، واعتراض الحزم.
قم بإنشاء حزم تعطي قيمًا محددة أو خذ الإعدادات الافتراضية:
from pypacker . layer3 . ip import IP
from pypacker . layer3 . icmp import ICMP
ip = IP ( src_s = "127.0.0.1" , dst_s = "192.168.0.1" , p = 1 ) +
ICMP ( type = 8 ) +
ICMP . Echo ( id = 123 , seq = 1 , body_bytes = b"foobar" )
# output packet
print ( "%s" % ip )
IP ( v_hl = 45 , tos = 0 , len = 2 A , id = 0 , off = 0 , ttl = 40 , p = 1 , sum = 3 B29 , src = b' x7f x00 x00 x01 ' , dst = b' xc0 xa8 x00 x01 ' , opts = [], handler = icmp )
ICMP ( type = 8 , code = 0 , sum = C03F , handler = echo )
Echo ( id = 7 B , seq = 1 , ts = 0 , bytes = b'foobar' )
اقرأ الحزم من الملف (تنسيق pcap/tcpdump)، وقم بتحليلها ثم إعادة كتابتها:
from pypacker import ppcap
from pypacker . layer12 import ethernet
from pypacker . layer3 import ip
from pypacker . layer4 import tcp
preader = ppcap . Reader ( filename = "packets_ether.pcap" )
pwriter = ppcap . Writer ( filename = "packets_ether_new.pcap" , linktype = ppcap . DLT_EN10MB )
for ts , buf in preader :
eth = ethernet . Ethernet ( buf )
if eth [ ethernet . Ethernet , ip . IP , tcp . TCP ] is not None :
print ( "%d: %s:%s -> %s:%s" % ( ts , eth [ ip . IP ]. src_s , eth [ tcp . TCP ]. sport ,
eth [ ip . IP ]. dst_s , eth [ tcp . TCP ]. dport ))
pwriter . write ( eth . bin ())
pwriter . close ()
اعتراض (وتعديل) الحزم على سبيل المثال MITM:
# Add iptables rule:
# iptables -I INPUT 1 -p icmp -j NFQUEUE --queue-balance 0:2
import time
from pypacker import interceptor
from pypacker . layer3 import ip , icmp
# ICMP Echo request intercepting
def verdict_cb ( ll_data , ll_proto_id , data , ctx ):
ip1 = ip . IP ( data )
icmp1 = ip1 [ icmp . ICMP ]
if icmp1 is None or icmp1 . type != icmp . ICMP_TYPE_ECHO_REQ :
return data , interceptor . NF_ACCEPT
echo1 = icmp1 [ icmp . ICMP . Echo ]
if echo1 is None :
return data , interceptor . NF_ACCEPT
pp_bts = b"PYPACKER"
print ( "changing ICMP echo request packet" )
echo1 . body_bytes = echo1 . body_bytes [: - len ( pp_bts )] + pp_bts
return ip1 . bin (), interceptor . NF_ACCEPT
ictor = interceptor . Interceptor ()
ictor . start ( verdict_cb , queue_ids = [ 0 , 1 , 2 ])
print ( "now sind a ICMP echo request to localhost: ping 127.0.0.1" )
time . sleep ( 999 )
ictor . stop ()
إرسال واستقبال الحزم:
# send/receive raw bytes
from pypacker import psocket
from pypacker . layer12 import ethernet
from pypacker . layer3 import ip
psock = psocket . SocketHndl ( mode = psocket . SocketHndl . MODE_LAYER_2 , timeout = 10 )
for raw_bytes in psock :
eth = ethernet . Ethernet ( raw_bytes )
print ( "Got packet: %r" % eth )
eth . reverse_address ()
eth . ip . reverse_address ()
psock . send ( eth . bin ())
# stop on first packet
break
psock . close ()
# send/receive using filter
from pypacker import psocket
from pypacker . layer3 import ip
from pypacker . layer4 import tcp
packet_ip = ip . IP ( src_s = "127.0.0.1" , dst_s = "127.0.0.1" ) + tcp . TCP ( dport = 80 )
psock = psocket . SocketHndl ( mode = psocket . SocketHndl . MODE_LAYER_3 , timeout = 10 )
def filter_pkt ( pkt ):
return pkt . ip . tcp . sport == 80
psock . send ( packet_ip . bin (), dst = packet_ip . dst_s )
pkts = psock . recvp ( filter_match_recv = filter_pkt )
for pkt in pkts :
print ( "got answer: %r" % pkt )
psock . close ()
# Send/receive based on source/destination data
from pypacker import psocket
from pypacker . layer3 import ip
from pypacker . layer4 import tcp
packet_ip = ip . IP ( src_s = "127.0.0.1" , dst_s = "127.0.0.1" ) + tcp . TCP ( dport = 80 )
psock = psocket . SocketHndl ( mode = psocket . SocketHndl . MODE_LAYER_3 , timeout = 10 )
packets = psock . sr ( packet_ip , max_packets_recv = 1 )
for p in packets :
print ( "got layer 3 packet: %s" % p )
psock . close ()
بعض الأمثلة:
راجع الأمثلة/ والاختبارات/test_pypacker.py.
يتم تنفيذ الاختبارات على النحو التالي:
نتائج اختبار الأداء: pypacker
orC = Intel Core2 Duo CPU @ 1,866 GHz, 2GB RAM, CPython v3.6
orP = Intel Core2 Duo CPU @ 1,866 GHz, 2GB RAM, Pypy 5.10.1
rounds per test: 10000
=====================================
>>> parsing (IP + ICMP)
orC = 86064 p/s
orP = 208346 p/s
>>> creating/direct assigning (IP only header)
orC = 41623 p/s
orP = 59370 p/s
>>> bin() without change (IP)
orC = 170356 p/s
orP = 292133 p/s
>>> output with change/checksum recalculation (IP)
orC = 10104 p/s
orP = 23851 p/s
>>> basic/first layer parsing (Ethernet + IP + TCP + HTTP)
orC = 62748 p/s
orP = 241047 p/s
>>> changing Triggerlist element value (Ethernet + IP + TCP + HTTP)
orC = 101552 p/s
orP = 201994 p/s
>>> changing Triggerlist/text based proto (Ethernet + IP + TCP + HTTP)
orC = 37249 p/s
orP = 272972 p/s
>>> direct assigning and concatination (Ethernet + IP + TCP + HTTP)
orC = 7428 p/s
orP = 14315 p/s
>>> full packet parsing (Ethernet + IP + TCP + HTTP)
orC = 6886 p/s
orP = 17040 p/s
نتائج اختبار الأداء: pypacker مقابل dpkt مقابل scapy
Comparing pypacker, dpkt and scapy performance (parsing Ethernet + IP + TCP + HTTP)
orC = Intel Core2 Duo CPU @ 1,866 GHz, 2GB RAM, CPython v3.6
orC2 = Intel Core2 Duo CPU @ 1,866 GHz, 2GB RAM, CPython v2.7
rounds per test: 10000
=====================================
>>> testing pypacker parsing speed
orC = 17938 p/s
>>> testing dpkt parsing speed
orC = 12431 p/s
>>> testing scapy parsing speed
orC2 = 726 p/s
س : أين يجب أن أبدأ في تعلم استخدام Pypacker؟
ج : إذا كنت تعرف Scapy بالفعل، فيجب أن تبدأ بقراءة الأمثلة. بخلاف ذلك، هناك مقدمة عامة عن pypacker مدرجة في المستندات والتي توضح استخدام ومفاهيم pypacker.
س : ما مدى سرعة pypacker؟
ج : انظر النتائج أعلاه. للحصول على نتائج مفصلة على جهازك، قم بتنفيذ الاختبارات.
س : هل هناك أي وثائق؟
ج : يعتمد Pypacker على كود dpkt، والذي بدوره لا يحتوي على أي وثائق رسمية وقليل جدًا من التعليمات البرمجية الداخلية. وهذا جعل فهم السلوك الداخلي أمرًا صعبًا. بعد كل شيء، تم توسيع وثائق التعليمات البرمجية إلى حد كبير لـ Pypacker. يمكن العثور على الوثائق في هذه الدلائل والملفات:
البروتوكولات نفسها (انظر LayerXYZ) بشكل عام لا تحتوي على الكثير من الوثائق لأنها موثقة بواسطة RFCs/المعايير الرسمية الخاصة بها.
س : ما هي البروتوكولات المدعومة؟
ج : الحد الأدنى من البروتوكولات المدعومة حاليًا هي: Ethernet، Radiotap، IEEE80211، ARP، DNS، STP، PPP، OSPF، VRRP، DTP، IP، ICMP، PIM، IGMP، IPX، TCP، UDP، SCTP، HTTP، NTP، RTP، DHCP، RIP، SIP، Telnet، HSRP، القطر، SSL، TPKT، Pmap، نصف القطر، BGP
س : كيف تتم إضافة البروتوكولات؟
ج : إجابة مختصرة: قم بتوسيع فئة Packet وأضف متغير الفئة __hdr__
لتحديد حقول الرأس. إجابة طويلة: راجع الأمثلة/examples_new_protocol.py للحصول على مثال كامل للغاية.
س : كيف يمكنني المساهمة في هذا المشروع؟
ج : يرجى استخدام متتبع الأخطاء في Github لطلب الأخطاء/الميزات. يرجى قراءة أداة تعقب الأخطاء للتعرف على الأخطاء المعروفة بالفعل قبل تقديم خطأ جديد. يمكن إرسال التصحيحات عبر طلب السحب.
س : بموجب أي ترخيص يتم إصدار Pypacker؟
ج : إنه ترخيص GPLv2 (راجع ملف الترخيص لمزيد من المعلومات).
س : هل هناك أي خطط لدعم [البروتوكول xyz]؟
ج : تتم إضافة دعم لبروتوكولات معينة إلى Pypacker نتيجة لمساهمة الأشخاص في هذا الدعم - لا توجد خطط رسمية لإضافة دعم لبروتوكولات معينة في إصدارات مستقبلية معينة.
س : هناك مشكلة xyz مع Pypacker الذي يستخدم Windows 3.11/XP/7/8/mobile وما إلى ذلك. هل يمكنك إصلاح ذلك؟
ج : يجب أن تعمل الميزات الأساسية مع أي نظام تشغيل. قد تسبب العناصر الاختيارية مشكلة (مثل المعترض) ولن يكون هناك دعم لذلك. لماذا؟ لأن الجودة مهمة ولن أقدم الدعم للأنظمة الرديئة. فكر مرتين قبل اختيار نظام التشغيل وتعامل مع العواقب؛ لا تلوم الآخرين على قرارك. البديل: أعطني تعويضًا ماليًا وسأرى ما يمكنني فعله (;
# This will lazy parse only needed layers behind the scenes
if ether.src == "...":
...
elif ip.src == "...":
...
elif tcp.sport == "...":
...
pkt = Ethernet() + IP() + TCP()
# This parses ALL layers
packet_print = "%s" % pkt
packet_found = pkt[Telnet]
# Alternative: Use multi-value index-notation. This will stop parsing at any non-matching layer:
packet_found = pkt[Ethernet,IP,TCP,Telnet]
pkt = ip.IP(src_s="1.2.3.4", dst_s="1.2.3.5") + tcp.TCP()
# Disable checksum calculation (and any other update) for IP and TCP (only THIS packet instance)
pkt.sum_au_active = False
pkt.tcp.sum_au_active = False
bts = pkt.bin(update_auto_fields=False)
sysctl -w net.core.rmem_max=12582912
sysctl -w net.core.rmem_default=12582912
sysctl -w net.core.wmem_max=12582912
sysctl -w net.core.wmem_default=12582912
sysctl -w net.core.optmem_max=2048000
sysctl -w net.core.netdev_max_backlog=5000
sysctl -w net.unix.max_dgram_qlen=1000
sysctl -w net.ipv4.tcp_rmem="10240 87380 12582912"
sysctl -w net.ipv4.tcp_wmem="10240 87380 12582912"
sysctl -w net.ipv4.tcp_mem="21228 87380 12582912"
sysctl -w net.ipv4.udp_mem="21228 87380 12582912"
sysctl -w net.ipv4.tcp_window_scaling=1
sysctl -w net.ipv4.tcp_timestamps=1
sysctl -w net.ipv4.tcp_sack=1