Database
NoSQL
MongoDB
Aggregation
Aggregation Pipeline Operations

Aggregation Pipeline in MongoDB

The Aggregation Pipeline in MongoDB is a powerful framework for data aggregation that processes documents through a series of stages, transforming them step-by-step. Each stage performs a specific operation on the input documents, allowing for complex data transformations and analysis directly within the database.

// Sample collection structure
db.sales.insertMany([
  { item: "apple", quantity: 10, price: 5, date: new Date("2024-09-01") },
  { item: "banana", quantity: 20, price: 2, date: new Date("2024-09-02") },
  { item: "orange", quantity: 15, price: 4, date: new Date("2024-09-01") },
  { item: "apple", quantity: 5, price: 5, date: new Date("2024-09-03") }
]);

Key Aggregation Stages

$match:

  • Filters documents to pass only those that match the specified conditions.
  • Often used at the beginning of the pipeline to reduce the number of documents processed in subsequent stages.
db.sales.aggregate([
  { $match: { item: "apple" } }
]);

$group:

  • Groups documents by a specified field or expression and applies aggregate functions like sum, average, or count on the grouped data.
  • Useful for generating summary statistics.
db.sales.aggregate([
  {
    $group: {
      _id: "$item",
      totalQuantity: { $sum: "$quantity" }
    }
  }
]);

$project:

  • Reshapes documents by including, excluding, or adding new fields.
  • Allows you to compute new fields based on existing data and control which fields are passed to the next stage.
db.sales.aggregate([
  {
    $project: {
      item: 1,
      totalCost: { $multiply: ["$quantity", "$price"] }
    }
  }
]);

$sort:

  • Orders documents based on the values of specified fields.
  • Can sort documents in ascending or descending order.
db.sales.aggregate([
  { $sort: { price: -1 } }
]);

$limit:

  • Restricts the number of documents passed to the next stage.
  • Allows you to fetch only a specified number of results.
db.sales.aggregate([
  { $limit: 2 }
]);

$skip:

  • Skips a specified number of documents.
  • Often used alongside $limit to implement pagination.
db.sales.aggregate([
  { $skip: 1 }
]);

$unwind:

  • Deconstructs an array field from the documents into separate documents for each element in the array.
  • Effectively “flattens” the array.
db.sales.aggregate([
  {
    $unwind: "$items"
  }
]);

$lookup:

  • Performs a left outer join to another collection within the same database.
  • Brings in documents that match specified conditions.
  • Used to merge data from different collections.
db.sales.aggregate([
  {
    $lookup: {
      from: "inventory",
      localField: "item",
      foreignField: "itemName",
      as: "inventoryDetails"
    }
  }
]);

$addFields:

  • Adds or modifies fields in the documents.
  • Allows the creation of new fields based on existing data.
db.sales.aggregate([
  {
    $addFields: {
      discountPrice: { $multiply: ["$price", 0.9] }
    }
  }
]);

$replaceRoot:

  • Replaces the input document with the specified document.
  • Can be used to promote embedded documents to the top level.
db.sales.aggregate([
  {
    $replaceRoot: { newRoot: "$inventoryDetails" }
  }
]);

$count:

  • Adds a count field that indicates the number of documents processed by the stage.
db.sales.aggregate([
  {
    $count: "totalSales"
  }
]);

$facet:

  • Allows multiple pipelines to process the same set of documents.
  • Each pipeline generates its own output within a single aggregate operation.
db.sales.aggregate([
  {
    $facet: {
      "priceStats": [{ $group: { _id: null, avgPrice: { $avg: "$price" } } }],
      "quantityStats": [{ $group: { _id: null, totalQuantity: { $sum: "$quantity" } } }]
    }
  }
]);

$bucket:

  • Groups documents into categories or "buckets" based on specified boundaries.
  • Helpful for data segmentation.
db.sales.aggregate([
  {
    $bucket: {
      groupBy: "$price",
      boundaries: [0, 5, 10],
      default: "Other",
      output: { count: { $sum: 1 }, totalQuantity: { $sum: "$quantity" } }
    }
  }
]);

$bucketAuto:

  • Similar to $bucket, but automatically determines the boundaries based on the data and the specified number of buckets.
db.sales.aggregate([
  {
    $bucketAuto: {
      groupBy: "$price",
      buckets: 3
    }
  }
]);

$sortByCount:

  • Sorts documents by the frequency of specified field values.
  • Effectively groups and counts documents by those values.
db.sales.aggregate([
  { $sortByCount: "$item" }
]);

$geoNear:

  • Returns documents sorted by proximity to a specified geospatial point.
  • Useful for location-based queries.
db.sales.aggregate([
  {
    $geoNear: {
      near: { type: "Point", coordinates: [0, 0] },
      distanceField: "dist.calculated"
    }
  }
]);

$redact:

  • Allows access control within documents by conditionally removing or keeping certain fields based on specified criteria.
db.sales.aggregate([
  {
    $redact: {
      $cond: {
        if: { $eq: ["$status", "A"] },
        then: "$$DESCEND",
        else: "$$PRUNE"
      }
    }
  }
]);

$merge:

  • Writes the results of the aggregation pipeline to a specified collection.
  • Merges with existing documents or inserts new ones as needed.
db.sales.aggregate([
  {
    $merge: {
      into: "sales_summary",
      whenMatched: "merge",
      whenNotMatched: "insert"
    }
  }
]);

$out:

  • Writes the results of the aggregation pipeline to a new or existing collection.
  • Allows you to save the results of an aggregation for further use.
db.sales.aggregate([
  {
    $out: "sales_aggregated"
  }
]);

Summary

The Aggregation Pipeline is a versatile tool in MongoDB for transforming and analyzing data, allowing you to perform complex queries directly in the database. By chaining multiple stages, you can efficiently process data to meet various analytical and reporting needs, all while leveraging MongoDB’s powerful query capabilities.