این آموزش سیستم های نوبت دهی را توضیح می دهد و نشان می دهد. صف ها اغلب برای پردازش کارهای طولانی مدت مانند تحویل خبرنامه از طریق ایمیل استفاده می شوند.

اجرای یک وظیفه در لحظه ای که درخواست می شود همیشه عملی نیست.

یک سیستم مدیریت خبرنامه ایمیل را در نظر بگیرید. پس از نوشتن ، مدیر باید دکمه قرمز “ارسال اکنون” را فشار دهد. برنامه میتوانست بلافاصله هر ایمیل را ارسال کنید و پاسخ “تکمیل شده” را نشان دهید. این برای ده پیام مفید است ، اما برای 1000 مشترک یا بیشتر چه مدت طول می کشد؟ درخواست مرورگر قبل از اتمام روند به پایان می رسد.

مثال دیگر: کاربر می تواند هر تعداد عکس را در یک برنامه گالری بارگذاری کند. این سیستم اندازه و وضوح هر تصویر را برای ابعاد جایگزین تغییر می دهد. این فرآیند می تواند در هنگام بارگذاری اجرا شود ، اما برای هر تصویر تأخیر ایجاد می شود.

در این شرایط جدا کردن وظایف موثرتر است. کاربر یک پاسخ فوری دریافت می کند اما پردازش کار در پس زمینه انجام می شود. سایر برنامه ها یا سرورها وظایف را کنترل می کنند و برای شکست دوباره برنامه ریزی می کنند. کاربر می تواند هشدارها را دریافت کند یا سیاهههای مربوط را بررسی کند تا پیشرفت را تعیین کند.

صف

آ صف یک ساختار داده است که مجموعه ای از موارد را در خود نگه می دارد:

  • هر فرایندی می تواند ارسال کند (یا احاطه کردن) یک مورد در هر زمان – مانند ارسال خبرنامه X به گیرنده Y.
  • هر فرایندی می تواند دریافت کند (یا کاهش) موردی که در جلوی صف قرار دارد – به عنوان مثال ، موردی که بیشترین مدت در صف بوده است.

صف ها یک ساختار اولین در اولین ورود (FIFO) است. اولین موردی که به صف اضافه شده اولین کالایی است که به فروش می رسد.

اجرای صف پایه JavaScript

با استفاده از آرایه JavaScript می توانید صف ایجاد کنید. push() روش یک مورد را به انتهای Array اضافه می کند در حالی که shift() روش از ابتدا موردی را حذف و برمی گرداند:

const queue = [];

queue.push( 'item 1' );
queue.push( 'item 2' );

console.log( queue.shift() ); 
console.log( queue.shift() ); 
console.log( queue.shift() ); 

عناصر آرایه های جداگانه می توانند هر داده ای را نگه دارند. می توانید رشته ها ، اعداد ، Booleans ، سایر آرایه ها یا اشیا را فشار دهید.

برای تعریف تعداد صف های جداگانه می توانید از کلاس ES6 استفاده کنید:

class Queue {

  constructor() { this.q = []; }
  send( item )  { this.q.push( item ); }
  receive()     { return this.q.shift(); }

}


const q1 = new Queue();
const q2 = new Queue();

q1.send('item 1');
q2.send('item 2');

console.log( q1.receive() ); 
console.log( q1.receive() ); 
console.log( q2.receive() ); 

این مثالهای ساده ممکن است برای کد سمت مشتری که از اهمیت بالاتری برخوردار است مانند به روزرسانی رابط کاربر صف در دسترس باشد ، بنابراین پردازش در یک به روزرسانی DOM انجام می شود. محلی ذخیره سازی یا نمایه شده در صورت لزوم می تواند سطح پایداری داده را ارائه دهد.

سیستم عامل های صف

صفهای حافظه برای کاربردهای پیچیده سرور کاربرد کمتری دارند:

  1. دو یا چند برنامه جداگانه نمی توانند (به راحتی) به صف یکسان دسترسی پیدا کنند.
  2. با خاتمه برنامه ، داده های صف از بین می روند.

نرم افزار کارگزار پیام ساخته شده هدفمند صف های محکم تری را فراهم می کند. سیستم عامل ها متفاوت هستند ، اما ویژگی هایی از جمله:

  • ماندگاری داده ها در انتخاب پایگاه های داده با گزینه های تکثیر ، تقسیم بندی و خوشه بندی
  • طیف وسیعی از پروتکل های دسترسی ، اغلب شامل HTTP و Web Sockets
  • هر تعداد صف جداگانه
  • تأخیر در پیام رسانی ، جایی که پردازش پیام می تواند در زمان دیگری انجام شود
  • پشتیبانی مانند معامله ، جایی که پیام هنگام پردازش تأیید نشده دوباره در صف قرار می گیرد
  • الگوهای انتشار-اشتراک ، جایی که برنامه ها هنگام ظاهر شدن مورد جدید در صف ، رویدادی را دریافت می کنند

نرم افزار پیام دهنده شامل پیام است Redis، RabbitMQ، Apache ActiveMQ، و دنده. خدمات پیام رسان ابری شامل SQS آمازون، اتوبوس لاجورد سرویس، و Google Pub / Sub.

این گزینه ها می توانند گزینه های مناسبی برای برنامه های سطح سازمانی باشند. اگر نیازهای ساده تری داشته باشید و قبلاً از یک پایگاه داده استفاده کرده باشید ، این ممکن است بیش از حد مجاز باشد.

از MongoDB به عنوان یک پیام رسان استفاده کنید

توسعه یک سیستم صف پیچیده در چند صد خط کد Node.js امکان پذیر است.

queue-mongodb ماژولی که در اینجا شرح داده می شود MongoDB برای ذخیره اطلاعات ، اما مفاهیم مشابه می توانند توسط هر پایگاه داده SQL یا NoSQL پذیرفته شوند. کد در دسترس است GitHub و بالاتر از سطح دریا.

شروع سریع

مطمئن شوید که … را دارید Node.js 14 یا بالاتر نصب شده است ، سپس یک پوشه پروژه جدید مانند ایجاد کنید queue-test. جدیدی اضافه کنید package.json فایل:

{
  "name": "queue-test",
  "version": "1.0.0",
  "description": "Queue test",
  "type": "module",
  "scripts": {
    "send": "node ./send.js",
    "receive": "node ./receive.js"
  }
}

توجه داشته باشید: "type": "module" پروژه را برای استفاده از ماژول های ES6 پیکربندی می کند. "scripts" موارد صف را ارسال و دریافت می کند.

نصب کنید صف-مونگودب مدول:

npm install @craigbuckler/queue-mongodb

سپس یک ایجاد کنید .env پرونده را با اطلاعات کاربری اتصال پایگاه داده MongoDB خود وارد کنید. مثلا:

QUEUE_DB_HOST=localhost
QUEUE_DB_PORT=27017
QUEUE_DB_USER=root
QUEUE_DB_PASS=mysecret
QUEUE_DB_NAME=qdb
QUEUE_DB_COLL=queue

توجه: این یک ایجاد می کند queue مجموعه (QUEUE_DB_COLL) در qdb پایگاه داده (QUEUE_DB_NAME) می توانید از یک پایگاه داده موجود استفاده کنید ، اما اطمینان حاصل کنید که مجموعه با دیگری تضاد ندارد.

دسترسی به خواندن / نوشتن پایگاه داده باید به کاربر اعطا شود root (QUEUE_DB_USER) با رمز عبور mysecret (QUEUE_DB_PASS) در صورت عدم احراز هویت ، هر دو مقدار را خالی تنظیم کنید.

اگر یک پایگاه داده MongoDB در حال اجرا نیست ، شروع کنید. کسانی که داکر و داکر آهنگ سازی می تواند جدید ایجاد کند docker-compose.yml فایل:

version: '3'

services:

  queuedb:
    environment:
      - MONGO_INITDB_ROOT_USERNAME=${QUEUE_DB_USER}
      - MONGO_INITDB_ROOT_PASSWORD=${QUEUE_DB_PASS}
    image: mongo:4.4-bionic
    container_name: queuedb
    volumes:
      - queuedata:/data/db
    ports:
      - "${QUEUE_DB_PORT}:${QUEUE_DB_PORT}"
    restart: always

volumes:
  queuedata:

سپس بدوید docker-compose up برای بارگیری و شروع MongoDB با حجم مداوم داده.

Docker در دسترس Linux ، macOS و Windows 10 است دستورالعمل نصب داکر.

جدید ایجاد کنید send.js پرونده را برای اضافه کردن پیام های ایمیل به طور تصادفی ایجاد شده به یک صف به نام news:


import { Queue } from '@craigbuckler/queue-mongodb';


const newsQ = new Queue('news');


const name = String.fromCharCode(65 + Math.random() * 26).repeat(1 + Math.random() * 10);


const send = await newsQ.send({
  name:     name,
  email:    `${ name.toLowerCase() }@test.com`,
  date:     new Date(),
  message:  `Hey there, ${ name }!`
});

console.log('send', send);


console.log('items queued:', await newsQ.count());


await newsQ.close();

با آن اجرا کنید npm run send و خروجی مانند این را مشاهده خواهید کرد:

send {
  _id: 607d692563bd6d05bb459931,
  sent: 2021-04-19T11:27:33.000Z,
  data: {
    name: 'AAA',
    email: 'aaa@test.com',
    date: 2021-04-19T11:27:33.426Z,
    message: 'Hey there, AAA!'
  }
}
items queued: 1

.send() متد برمی گرداند qItem شی object حاوی:

  1. سند MongoDB _id
  2. تاریخ / زمان قرار گرفتن کالا در صف اصلی ، و
  3. یک کپی از پیام data

اسکریپت را هر چند بار اجرا کنید تا موارد دیگری را به صف اضافه کنید. items queued در هر اجرا افزایش می یابد.

اکنون یک جدید ایجاد کنید receive.js پرونده را برای بازیابی پیام ها از همان صف:


import { Queue } from '@craigbuckler/queue-mongodb';


const newsQ = new Queue('news');

let qItem;

do {

  qItem = await newsQ.receive();

  if (qItem) {

    console.log('nreceive', qItem);

    
    

  }

} while (qItem);


console.log('items queued:', await newsQ.count());

await newsQ.close();

اجرا کن npm run receive برای واکشی و پردازش موارد در صف:

receive {
  _id: 607d692563bd6d05bb459931,
  sent: 2021-04-19T11:27:33.000Z,
  data: {
    name: 'AAA',
    email: 'aaa@test.com',
    date: 2021-04-19T11:27:33.426Z,
    message: 'Hey there, AAA!'
  }
}
items queued: 0

در این مثال هیچ ایمیلی ارسال نمی شود ، اما می توان با استفاده از آن پیاده سازی کرد Nodemailer یا یک ماژول مناسب دیگر

اگر پردازش ناموفق بود – شاید به این دلیل که سرور نامه از کار افتاده است – یک مورد را می توان با این صف دوباره قرار داد:

newsQ.send( qItem.data, 600 );

دومین 600 آرگومان تعداد اختیاری ثانیه یا تاریخ آینده است. این دستور پس از گذشت 600 ثانیه (ده دقیقه) مجدداً مورد را در صف قرار می دهد.

این یک مثال ساده است ، اما هر برنامه ای می تواند داده ها را به تعداد صف ارسال کند. روند دیگری ، شاید به عنوان یک شروع شده است cron شغل ، می تواند موارد را در صورت لزوم دریافت و پردازش کند.

چگونه queue-mongodb ماژول کار می کند

type رشته ای که به سازنده کلاس منتقل می شود ، یک نام صف را تعریف می کند. .send() روش هنگام ارسال داده ها ، یک سند MongoDB جدید ایجاد می کند تا به صف اضافه شود. سند MongoDB شامل موارد زیر است:

  1. یک MongoDB _id (تاریخ / زمان ایجاد در مقدار رمزگذاری شده است).
  2. صف type.
  3. یک مقدار تاریخ / زمان پردازش به نام proc. تعیین زمان آینده امکان پذیر است اما زمان فعلی پیش فرض است.
  4. مورد data. این می تواند هر چیزی باشد: بولی ، عدد ، رشته ، آرایه ، شی و غیره.

.receive() متد قدیمی ترین سندی را پیدا می کند که دارای تطبیق باشد type و یک proc تاریخ / زمان در گذشته سند قالب بندی می شود ، به کد تماس برگردانده می شود و از پایگاه داده حذف می شود.

بخشهای زیر ماژول را توصیف کنید با جزئیات بیشتر

queue-mongodb ماژول: مقداردهی اولیه

dotenv مدول می خواند .env در صورت لزوم متغیرهای محیطی. یک شی اتصال پایگاه داده با استفاده از رسمی ایجاد می شود mongodb ماژول درایور:


import dotenv from 'dotenv';
import mongoDB from 'mongodb';


if (!process.env.QUEUE_DB_HOST) {
  dotenv.config();
}


const
  dbName = process.env.QUEUE_DB_NAME || 'qdb',
  qCollectionName = process.env.QUEUE_DB_COLL || 'queue',
  qAuth = process.env.QUEUE_DB_USER ? `${ process.env.QUEUE_DB_USER }:${ process.env.QUEUE_DB_PASS || '' }@` : '',

  dbClient = new mongoDB.MongoClient(
    `mongodb://${ qAuth }${ process.env.QUEUE_DB_HOST || 'localhost' }:${ process.env.QUEUE_DB_PORT || '27017' }/`,
    { useNewUrlParser: true, useUnifiedTopology: true }
  );

qCollection متغیر یک ارجاع به مجموعه صف پایگاه داده دارد (تعریف شده توسط QUEUE_DB_COLL) توسط ایجاد شده و بازگردانده می شود dbConnect() تابع ، که همچنین برنامه جمع آوری و شاخص ها را در صورت لزوم تعریف می کند. همه Queue روش ها اجرا می شود const q = await dbConnect(); برای دریافت مرجع مجموعه:

let qCollection; 



async function dbConnect() {

  
  if (qCollection) return qCollection;

  
  await dbClient.connect();

  
  const
    db = dbClient.db( dbName ),
    colList = await db.listCollections({ name: qCollectionName }, { nameOnly: true }).toArray();

  if (!colList.length) {

    
    let $jsonSchema = {
      bsonType: 'object',
      required: [ 'type', 'proc', 'data' ],
      properties: {
        type: { bsonType: 'string', minLength: 1 },
        proc: { bsonType: 'date' }
      }
    };
    await db.createCollection(qCollectionName, { validator: { $jsonSchema } });

    
    await db.collection( qCollectionName ).createIndexes([
      { key: { type: 1 } },
      { key: { proc: 1 } }
    ]);

  }

  
  qCollection = db.collection( qCollectionName );
  return qCollection;

}

dbClose() تابع اتصال پایگاه داده را می بندد:


async function dbClose() {

  if (qCollection) {
    await dbClient.close();
    qCollection = null;
  }

}

queue-mongodb مدول: Queue سازنده

Queue سازنده صف را تنظیم می کند type یا نام:

export class Queue {

  constructor(type = 'DEFAULT') {

    this.type = type;

  }

queue-mongodb مدول: Queue.send() روش

.send() روش داده ها را با صفات مناسب به صف اضافه می کند type. این یک اختیاری است delayUntil پارامتر ، که با تعیین تعداد ثانیه ها یا یک مورد ، یک آیتم را به صف اضافه می کند Date().

این روش یک سند جدید را وارد پایگاه داده می کند و a را برمی گرداند qItem هدف – شی ( { _id، sent، data} ) یا null در صورت عدم موفقیت:

  async send(data = null, delayUntil) {

    try {

      
      let proc = new Date();
      if (delayUntil instanceof Date) {
        proc = delayUntil;
      }
      else if (!isNaN(delayUntil)) {
        proc = new Date( +proc + delayUntil * 1000);
      }

      
      const
        q     = await dbConnect(),
        ins   = await q.insertOne({
          type: this.type, proc, data
        });

      
      return ins && ins.insertedCount && ins.insertedId ? { _id: ins.insertedId, sent: ins.insertedId.getTimestamp(), data } : null;

    }
    catch(err) {

      console.log(`Queue.send error:n${ err }`);
      return null;

    }

  }

queue-mongodb مدول: Queue.receive() روش

.receive() متد قدیمی ترین مورد در پایگاه داده را با یک خاص بازیابی و حذف می کند type و یک proc تاریخ / زمان در گذشته آن را برمی گرداند qItem هدف – شی ( {_id، sent، data} ) یا null اگر چیزی در دسترس نیست یا خطایی رخ می دهد:

  async receive() {

    try {

      
      const
        now = new Date(),
        q   = await dbConnect(),
        rec = await q.findOneAndDelete(
          {
            type: this.type,
            proc: { $lt: now }
          },
          {
            sort: { proc: 1 }
          }
        );

      const v = rec && rec.value;

      
      return v ? { _id: v._id, sent: v._id.getTimestamp(), data: v.data } : null;

    }
    catch(err) {

      console.log(`Queue.receive error:n${ err }`);
      return null;

    }

  }

queue-mongodb مدول: Queue.remove() روش

.remove() روش آیتم مشخص شده توسط a را حذف می کند qItem هدف – شی ( {_id، sent، data}) توسط .send() روش. بدون در نظر گرفتن موقعیت آن در صف ، می توان از آن برای حذف مورد استفاده کرد.

این روش تعداد اسناد حذف شده (به طور معمول 1) یا null هنگام بروز خطا:

  async remove(qItem) {

    
    if (!qItem || !qItem._id) return null;

    try {

      const
        q   = await dbConnect(),
        del = await q.deleteOne({ _id: qItem._id });

      return del.deletedCount;

    }
    catch(err) {

      console.log(`Queue.remove error:n${ err }`);
      return null;

    }

  }

queue-mongodb مدول: Queue.purge() روش

.purge() متد همه موارد را از همان صف حذف می کند type و تعداد حذف ها را برمی گرداند:

  async purge() {

    try {

      const
        q   = await dbConnect(),
        del = await q.deleteMany({ type: this.type });

      return del.deletedCount;

    }
    catch(err) {

      console.log(`Queue.purge error:n${ err }`);
      return null;

    }

  }

queue-mongodb مدول: Queue.count() روش

.count() روش تعداد موارد صف شده از همان را برمی گرداند type:

  async count() {

    try {

      const q = await dbConnect();
      return await q.countDocuments({ type: this.type });

    }
    catch(err) {

      console.log(`Queue.count error:n${ err }`);
      return null;

    }

  }

queue-mongodb مدول: Queue.close() روش

.close() روش اجرا می شود dbClose() عملکرد برای خاتمه اتصال پایگاه داده تا حلقه رویداد Node.js پایان یابد:

  async close() {

    try {

      await dbClose();

    }
    catch(err) {

      console.log(`Queue.close error:n${ err }`);
      return null;

    }

  }


}

یک صف جدید

صف در نظر گرفتن هر برنامه وب با عملکردهای گران محاسباتی است که می تواند باعث ایجاد تنگنا شود. آنها می توانند با جدا کردن برنامه ها در فرآیندهای کوچکتر ، سریعتر و مقاوم ، عملکرد و نگهداری را بهبود بخشند. نرم افزار اختصاصی کارگزار پیام یک گزینه است ، اما سیستم های نوبت دهی ساده در چند ده کد کد امکان پذیر است.