أولاً، سنشرح ماهية المزامنة وما هي مشكلات عدم المزامنة، ثم سنناقش الإجراءات التي يمكن اتخاذها للتحكم في المزامنة. بعد ذلك، سنقوم ببناء "تجمع مؤشرات الترابط" من جانب الخادم تمامًا كما حدث عندما قمنا بمراجعة الشبكة يوفر لنا JDK مجموعة أدوات متزامنة كبيرة، وأخيرًا سنستكشف محتوياتها.
لماذا مزامنة الموضوع؟
عندما يتعلق الأمر بمزامنة الخيوط، فإننا في معظم الحالات نناقش حالة " كائن واحد متعدد الخيوط "، والتي تنقسم بشكل عام إلى جزأين، أحدهما يتعلق بـ "المتغيرات المشتركة" والآخر يتعلق بـ "خطوات التنفيذ".
المتغيرات المشتركة
عندما نحدد متغيرًا عامًا في كائن مؤشر ترابط (قابل للتشغيل) وتقوم طريقة التشغيل بتعديل المتغير، إذا كانت مؤشرات الترابط المتعددة تستخدم كائن مؤشر الترابط في نفس الوقت، فسيتم تعديل قيمة المتغير العام في نفس الوقت، مما يسبب خطأ . دعونا نلقي نظرة على الكود التالي:
تشغيل الفراغ العام ()
{
System.out.println(Thread.currentThread().getName() + "ابدأ.");
لـ (int i = 1; i <= 100; i++)
{
مجموع += أنا؛
}
يحاول {
Thread.sleep(500);
} قبض على (InterruptedException e) {
printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "--- قيمة المجموع هي" + sum);
System.out.println(Thread.currentThread().getName() + "End.");
}
}
يرمي الفراغ الثابت الخاص ShareVaribleTest () InterruptedException
{
MyRunner runner = new MyRunner();
Thread thread1 = new Thread(runner);
Thread thread2 = new Thread(runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
Thread1.start();
Thread2.start();
Thread1.join();
Thread2.join();
}
عندما نجري عمليات باستخدام سلاسل عمليات متعددة، قد نحتاج إلى دمج عمليات معينة باعتبارها "عمليات ذرية"، أي أنه يمكن اعتبار هذه العمليات "ذات ترابط واحد"، على سبيل المثال، قد نرغب في أن تبدو نتيجة الإخراج هكذا :
يلقي syncTest () الفراغ الثابت الخاص InterruptedException
{
MyNonSyncRunner runner = new MyNonSyncRunner();
Thread thread1 = new Thread(runner);
Thread thread2 = new Thread(runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
Thread1.start();
Thread2.start();
Thread1.join();
Thread2.join();
}
نظرًا لأن مزامنة الخيط بها المشكلات المذكورة أعلاه، فكيف يجب أن نحلها؟ يمكننا اعتماد استراتيجيات مختلفة لمشاكل المزامنة الناتجة عن أسباب مختلفة.
التحكم في المتغيرات المشتركة
يمكننا التحكم في المتغيرات المشتركة بثلاث طرق.
تغيير "مؤشر الترابط المتعدد لكائن واحد" إلى "مؤشر الترابط المتعدد للكائنات المتعددة"
كما هو مذكور أعلاه، تحدث مشكلات المزامنة بشكل عام في سيناريوهات "كائن واحد متعدد الخيوط"، لذا فإن أبسط طريقة للتعامل معها هي تعديل النموذج قيد التشغيل إلى "كائن متعدد متعدد الخيوط" لمشكلة المزامنة في المثال أعلاه ، تعديل الكود النهائي هو كما يلي:
نظرًا لأن المشكلة ناجمة عن المتغيرات المشتركة، فيمكننا تغيير المتغيرات المشتركة إلى "غير مشتركة"، أي تعديلها إلى متغيرات محلية. يمكن أن يؤدي هذا أيضًا إلى حل المشكلة، بالنسبة للمثال أعلاه، يكون رمز هذا الحل كما يلي:
الفراغ الثابت الخاص ShareVaribleTest3() يلقي InterruptedException
{
MyRunner2 runner = new MyRunner2();
Thread thread1 = new Thread(runner);
Thread thread2 = new Thread(runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
Thread1.start();
Thread2.start();
Thread1.join();
Thread2.join();
}
ThreadLocal هي آلية مقدمة من JDK يتم استخدامها لحل المتغيرات المشتركة بين سلاسل الرسائل. المتغيرات المعلنة باستخدام ThreadLocal هي متغيرات عامة في سلسلة الرسائل.
يمكننا تحويل الكود أعلاه بهذه الطريقة كما يلي:
تشغيل الفراغ العام ()
{
System.out.println(Thread.currentThread().getName() + "ابدأ.");
لـ (int i = 0; i <= 100; i++)
{
إذا (tl.get() == فارغة)
{
tl.set(new Integer(0));
}
int sum = ((Integer)tl.get()).intValue();
sum+= i;
tl.set(new Integer(sum));
يحاول {
Thread.sleep(10);
} قبض على (InterruptedException e) {
printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " --- قيمة المجموع هي " + ((Integer)tl.get()).intValue());
System.out.println(Thread.currentThread().getName() + "End.");
}
}
الفراغ الثابت الخاص ShareVaribleTest4() يلقي InterruptedException
{
MyRunner3 runner = new MyRunner3();
Thread thread1 = new Thread(runner);
Thread thread2 = new Thread(runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
Thread1.start();
Thread2.start();
Thread1.join();
Thread2.join();
}
السيطرة على خطوات التنفيذ
عند الحديث عن خطوات التنفيذ، يمكننا استخدام الكلمة الأساسية المتزامنة لحلها.
يلقي syncTest2 () الفراغ الثابت الخاص InterruptedException
{
MySyncRunner runner = new MySyncRunner();
Thread thread1 = new Thread(runner);
Thread thread2 = new Thread(runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
Thread1.start();
Thread2.start();
Thread1.join();
Thread2.join();
}
موضوع الموضوع 1 = موضوع جديد ()
{
تشغيل الفراغ العام ()
{
System.out.println(Thread.currentThread().getName() + "ابدأ.");
عشوائي ص = عشوائي جديد (100)؛
متزامن (قائمة)
{
لـ (int i = 0; i < 5; i++)
{
list.add(new Integer(r.nextInt()));
}
System.out.println("حجم القائمة هو" + list.size());
}
يحاول
{
Thread.sleep(500);
}
قبض على (InterruptedException على سبيل المثال)
{
ex.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "End.");
}
};
الموضوع thread2 = الموضوع الجديد ()
{
تشغيل الفراغ العام ()
{
System.out.println(Thread.currentThread().getName() + "ابدأ.");
عشوائي ص = عشوائي جديد (100)؛
متزامن (قائمة)
{
لـ (int i = 0; i < 5; i++)
{
list.add(new Integer(r.nextInt()));
}
System.out.println("حجم القائمة هو" + list.size());
}
يحاول
{
Thread.sleep(500);
}
قبض على (InterruptedException على سبيل المثال)
{
ex.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "End.");
}
};
Thread1.start();
Thread2.start();
Thread1.join();
Thread2.join();
}
بناء تجمع المواضيع
لقد قمنا ببناء تجمع اتصال مأخذ التوصيل في <تحليل تطبيق اتصالات الشبكة بناءً على مراجعة Java>. قمنا هنا ببناء تجمع مؤشرات الترابط على هذا الأساس لإكمال عمليات التشغيل الأساسية والنوم والاستيقاظ والإيقاف.
الفكرة الأساسية هي الحفاظ على سلسلة من الخيوط في شكل مصفوفة، من خلال اتصال المقبس، يرسل العميل أوامر إلى الخادم، عندما يتلقى الخادم الأمر، يقوم بتشغيل الخيوط في مصفوفة الخيوط وفقًا للأمر المستلم.
يظل رمز عميل المقبس دون تغيير، ولا يزال الكود المستخدم عند إنشاء تجمع اتصال المقبس مستخدمًا، ونحن نركز بشكل أساسي على جانب الخادم.
أولاً، نحتاج إلى تحديد كائن مؤشر ترابط، والذي يتم استخدامه لتنفيذ عملياتنا التجارية، من أجل البساطة، نترك مؤشر الترابط في وضع السكون فقط.
تعداد ThreadTask
{
يبدأ،
قف،
ينام،
استيقظ
}
فئة MyThread تمتد الموضوع
{
حالة ThreadStatus العامة = ThreadStatus.Initial؛
مهمة ThreadTask العامة؛
تشغيل الفراغ العام ()
{
الحالة = ThreadStatus.Running؛
بينما (صحيح)
{
يحاول {
Thread.sleep(3000);
إذا (الحالة == ThreadStatus.Sleeping)
{
System.out.println(Thread.currentThread().getName() + "أدخل إلى حالة السكون.");
this.wait();
}
} قبض على (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "حدث خطأ أثناء العملية.");
الحالة = ThreadStatus.Stopped؛
}
}
}
}
إدارة الفراغ الثابت العام (مؤشر ترابط MyThread، مهمة ThreadTask)
{
إذا (المهمة == ThreadTask.Start)
{
إذا (thread.status == ThreadStatus.Running)
{
يعود؛
}
إذا (thread.status == ThreadStatus.Stopped)
{
Thread = new MyThread();
}
thread.status = ThreadStatus.Running;
Thread.start();
}
وإلا إذا (المهمة == ThreadTask.Stop)
{
إذا (thread.status != ThreadStatus.Stopped)
{
Thread.interrupt();
thread.status = ThreadStatus.Stopped;
}
}
وإلا إذا (المهمة == ThreadTask.Sleep)
{
thread.status = ThreadStatus.Sleeping;
}
وإلا إذا (المهمة == ThreadTask.Wakeup)
{
Thread.notify();
thread.status = ThreadStatus.Running;
}
}
سلسلة getThreadStatus العامة الثابتة (خيوط MyThread[])
{
StringBuffer sb = new StringBuffer();
لـ (int i = 0; i < Threads.length; i++)
{
sb.append(threads[i].getName() + "الحالة: " + threads[i].status).append("/r/n");
}
إرجاع sb.toString();
}
}
يلقي الفراغ الرئيسي العام (String[] args) IOException
{
تجمع MyThreadPool = new MyThreadPool(5);
}
عدد الصفحات الخاص int ؛
خيوط MyThread الخاصة[] = فارغة؛
يقوم MyThreadPool العام (عدد العمليات) بطرح IOException
{
this.threadCount = count;
Threads = new MyThread[count];
لـ (int i = 0; i < Threads.length; i++)
{
Threads[i] = new MyThread();
المواضيع[i].start();
}
الحرف الأول () ؛
}
الفراغ الخاص Init() يلقي IOException
{
ServerSocket serverSocket = new ServerSocket(5678);
بينما (صحيح)
{
مقبس المقبس النهائي = serverSocket.accept();
موضوع الموضوع = موضوع جديد ()
{
تشغيل الفراغ العام ()
{
يحاول
{
System.out.println("تم اكتشاف اتصال مقبس جديد.");
BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintStream ps = new PrintStream(socket.getOutputStream());
خط السلسلة = فارغ؛
بينما ((السطر = br.readLine()) != فارغ)
{
System.out.println(line);
إذا (line.equals("العد"))
{
System.out.println("يوجد 5 سلاسل رسائل في تجمع سلاسل الرسائل");
}
وإلا إذا (line.equals("الحالة"))
{
حالة السلسلة = MyThreadManager.getThreadStatus(threads);
System.out.println(status);
}
وإلا إذا (line.equals("StartAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Start);
}
وإلا إذا (line.equals("StopAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Stop);
}
وإلا إذا (line.equals("SleepAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Sleep);
}
وإلا إذا (line.equals("WakeupAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Wakeup);
}
وإلا إذا (line.equals("النهاية"))
{
استراحة؛
}
آخر
{
System.out.println("الأمر:" + خط);
}
ps.println("موافق");
ps.flush();
}
}
قبض (استثناء على سبيل المثال)
{
ex.printStackTrace();
}
}
};
Thread.start();
}
}
}
من أجل تبسيط عبء عمل المطورين أثناء التطوير متعدد الخيوط وتقليل الأخطاء في البرامج، توفر JDK مجموعة من مجموعات الأدوات المتزامنة، والتي يمكننا استخدامها لتطوير برامج متعددة الخيوط بشكل ملائم.
تجمع الموضوع
لقد قمنا بتنفيذ تجمع مؤشرات ترابط "بسيط" للغاية أعلاه، ويتم توفير تجمع مؤشرات الترابط أيضًا في مجموعة الأدوات المتزامنة، وهو مناسب جدًا للاستخدام.
يتم تقسيم تجمعات مؤشرات الترابط في مجموعة الأدوات المتزامنة إلى 3 فئات: SchededThreadPool وFixedThreadPool وCachedThreadPool.
أولاً نحدد كائنًا قابلاً للتشغيل
جدول الخيوط المجدول
هذا مشابه للمهمة المجدولة التي نستخدمها عادةً، أو يشبه إلى حد كبير المؤقت. يمكن أن يتسبب في بدء تشغيل سلسلة رسائل خلال فترة زمنية محددة، وتشغيلها مرة أخرى بعد فترة زمنية أخرى حتى يتم إغلاق تجمع مؤشرات الترابط.
رمز العينة كما يلي:
MyRunner runner = new MyRunner();
FinalchedFuture<?> Handler1 = Schedule.scheduleAtFixedRate(runner, 1, 10, TimeUnit.SECONDS);
FinalchedFuture<?> Handler2 = Schedule.scheduleWithFixedDelay(runner, 2, 10, TimeUnit.SECONDS);
جدولة.جدول (جديد قابل للتشغيل ()
{
تشغيل الفراغ العام ()
{
Handler1.cancel(true);
Handler2.cancel(true);
جدولة.shutdown();
}
}، 30، TimeUnit.SECONDS
);
}
هذا عبارة عن تجمع سلاسل رسائل بسعة محددة، أي أنه يمكننا تحديد أنه يمكن تشغيل سلاسل رسائل متعددة على الأكثر في تجمع سلاسل الرسائل في نفس الوقت، وستكون هناك فرصة للخيوط الزائدة للتشغيل فقط عندما تكون هناك سلاسل رسائل خاملة في تجمع الموضوع.
خذ بعين الاعتبار الكود التالي:
هذا تجمع سلاسل رسائل آخر لا يتطلب سعة محددة وسيقوم بإنشاء سلاسل رسائل جديدة عند الحاجة.
استخدامه مشابه جدًا لـ FixedThreadPool، انظر إلى الكود التالي:
في بعض الحالات، نحتاج إلى استخدام قيمة الإرجاع للخيط. في جميع الرموز المذكورة أعلاه، يقوم الخيط بتنفيذ عمليات معينة دون أي قيمة إرجاع.
كيف تفعل هذا؟ يمكننا استخدام Callable<T> وCompletionService<T> في JDK، حيث يقوم الأول بإرجاع نتائج خيط واحد، بينما يقوم الأخير بإرجاع نتائج مجموعة من سلاسل الرسائل.
إرجاع النتائج من موضوع واحد
دعونا نلقي نظرة على الكود:
تحتاج إلى استخدام CompletionService<T> هنا، الرمز كما يلي:
Thread.sleep(1000);
ل(int i = 0; i < 10; i++)
{
نتيجة المستقبل<String> = Service.take();
System.out.println("قيمة الإرجاع للخيط هي" + result.get());
}
exec.shutdown();
}
يجب أن نكون جميعًا على دراية بنموذج المنتج والمستهلك، وعادةً ما نستخدم نوعًا ما من بنية البيانات لتنفيذه. في مجموعة الأدوات المتزامنة، يمكننا استخدام BlockingQueue لتنفيذ نموذج المنتج والمستهلك، على النحو التالي:
الفراغ العام الثابت الرئيسي (String[] args)
{
blockingQueueTest();
}
حظر الفراغ الثابت الخاصQueueTest()
{
Final BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
Final int maxSleepTimeForSetter = 10;
Final int maxSleepTimerForGetter = 10;
أداة ضبط قابلة للتشغيل = Runnable() جديدة
{
تشغيل الفراغ العام ()
{
Random r = new Random();
بينما (صحيح)
{
قيمة int = r.nextInt(100);
يحاول
{
queue.put(new Integer(value));
System.out.println(Thread.currentThread().getName() + "---أدخل قيمة في قائمة الانتظار" + value);
Thread.sleep(r.nextInt(maxSleepTimeForSetter) * 1000);
}
قبض (استثناء على سبيل المثال)
{
ex.printStackTrace();
}
}
}
};
getter القابل للتشغيل = Runnable () الجديد
{
تشغيل الفراغ العام ()
{
Random r = new Random();
بينما (صحيح)
{
يحاول
{
إذا (queue.size() == 0)
{
System.out.println(Thread.currentThread().getName() + "---قائمة الانتظار فارغة");
}
آخر
{
قيمة int = queue.take().intValue();
System.out.println(Thread.currentThread().getName() + "---احصل على القيمة من قائمة الانتظار" + value);
}
Thread.sleep(r.nextInt(maxSleepTimerForGetter) * 1000);
}
قبض (استثناء على سبيل المثال)
{
ex.printStackTrace();
}
}
}
};
ExecutorService exec = Executors.newFixedThreadPool(2);
exec.execute(setter);
exec.execute(getter);
}
}
نتائج التنفيذ المحتملة هي كما يلي:
استخدم الإشارات للتحكم في المواضيع
يوفر JDK إشارة لتنفيذ وظيفة "الإشارة"، ويوفر طريقتين للحصول على الإشارات وإصدارها: الحصول على التعليمات البرمجية وإصدارها كما يلي:
لـ (int i = 0; i < 10; i++)
{
عداء قابل للتشغيل = Runnable () جديد
{
تشغيل الفراغ العام ()
{
يحاول
{
semp.acquire();
System.out.println(new Date() + " " + Thread.currentThread().getName() + "Executing.");
Thread.sleep(5000);
semp.release();
}
قبض (استثناء على سبيل المثال)
{
ex.printStackTrace();
}
}
};
exec.execute(runner);
}
exec.shutdown();
}
لقد ذكرنا سابقًا أنه يمكن استخدام الكلمة الأساسية المتزامنة للتحكم في خطوات التنفيذ في مؤشر ترابط واحد، لذا إذا أردنا التحكم في خطوات تنفيذ جميع سلاسل الرسائل في تجمع سلاسل الرسائل، فكيف يجب أن ننفذها؟
لدينا طريقتان، إحداهما هي استخدام CyclicBarrier والأخرى هي استخدام CountDownLatch.
يستخدم CyclicBarrier آلية مشابهة لـ Object.wait، ويحتاج مُنشئه إلى الحصول على رقم صحيح للإشارة إلى عدد سلاسل العمليات التي يحتاج إلى التحكم فيها. وعندما يتم استدعاء أسلوب الانتظار الخاص به في طريقة تشغيل الخيط، فإنه سيضمن ذلك فقط وصلت المواضيع إلى هذه الخطوة فهل ستستمر في تنفيذ الخطوات اللاحقة.
رمز العينة كما يلي:
تشغيل الفراغ العام () {
Random r = new Random();
يحاول
{
لـ (int i = 0; i < 3; i++)
{
Thread.sleep(r.nextInt(10) * 1000);
System.out.println(new Date() + "--" + Thread.currentThread().getName() + "--th" + (i + 1) + "wait.");
AWAIT();
}
}
قبض (استثناء على سبيل المثال)
{
ex.printStackTrace();
}
}
}
فراغ ثابت خاص cyclicBarrierTest ()
{
حاجز CyclicBarrier = جديد CyclicBarrier(3);
ExecutorService exec = Executors.newFixedThreadPool(3);
لـ (int i = 0; i < 3; i++)
{
exec.execute(new MyRunner2(barrier));
}
exec.shutdown();
}
يستخدم CountDownLatch آلية مشابهة لـ "عداد العد التنازلي" للتحكم في سلاسل الرسائل في تجمع سلاسل الرسائل، وله طريقتان: CountDown وAwait. رمز العينة كما يلي: