blob: a9dc13d898a8bae370200bc83b2bfac7b97bf64b [file] [log] [blame]
Don Newton379ae252019-04-01 12:17:06 -04001// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package mongo
8
9import (
10 "context"
11 "errors"
12
13 "github.com/mongodb/mongo-go-driver/bson"
14 "github.com/mongodb/mongo-go-driver/bson/bsoncodec"
15 "github.com/mongodb/mongo-go-driver/x/mongo/driver"
16)
17
18// Cursor is used to iterate a stream of documents. Each document is decoded into the result
19// according to the rules of the bson package.
20//
21// A typical usage of the Cursor type would be:
22//
23// var cur *Cursor
24// ctx := context.Background()
25// defer cur.Close(ctx)
26//
27// for cur.Next(ctx) {
28// elem := &bson.D{}
29// if err := cur.Decode(elem); err != nil {
30// log.Fatal(err)
31// }
32//
33// // do something with elem....
34// }
35//
36// if err := cur.Err(); err != nil {
37// log.Fatal(err)
38// }
39//
40type Cursor struct {
41 // Current is the BSON bytes of the current document. This property is only valid until the next
42 // call to Next or Close. If continued access is required to the bson.Raw, you must make a copy
43 // of it.
44 Current bson.Raw
45
46 bc batchCursor
47 pos int
48 batch []byte
49 registry *bsoncodec.Registry
50
51 err error
52}
53
54func newCursor(bc batchCursor, registry *bsoncodec.Registry) (*Cursor, error) {
55 if registry == nil {
56 registry = bson.DefaultRegistry
57 }
58 if bc == nil {
59 return nil, errors.New("batch cursor must not be nil")
60 }
61 return &Cursor{bc: bc, pos: 0, batch: make([]byte, 0, 256), registry: registry}, nil
62}
63
64func newEmptyCursor() *Cursor {
65 return &Cursor{bc: driver.NewEmptyBatchCursor()}
66}
67
68// ID returns the ID of this cursor.
69func (c *Cursor) ID() int64 { return c.bc.ID() }
70
71func (c *Cursor) advanceCurrentDocument() bool {
72 if len(c.batch[c.pos:]) < 4 {
73 c.err = errors.New("could not read next document: insufficient bytes")
74 return false
75 }
76 length := (int(c.batch[c.pos]) | int(c.batch[c.pos+1])<<8 | int(c.batch[c.pos+2])<<16 | int(c.batch[c.pos+3])<<24)
77 if len(c.batch[c.pos:]) < length {
78 c.err = errors.New("could not read next document: insufficient bytes")
79 return false
80 }
81 if len(c.Current) > 4 {
82 c.Current[0], c.Current[1], c.Current[2], c.Current[3] = 0x00, 0x00, 0x00, 0x00 // Invalidate the current document
83 }
84 c.Current = c.batch[c.pos : c.pos+length]
85 c.pos += length
86 return true
87}
88
89// Next gets the next result from this cursor. Returns true if there were no errors and the next
90// result is available for decoding.
91func (c *Cursor) Next(ctx context.Context) bool {
92 if ctx == nil {
93 ctx = context.Background()
94 }
95 if c.pos < len(c.batch) {
96 return c.advanceCurrentDocument()
97 }
98
99 // clear the batch
100 c.batch = c.batch[:0]
101 c.pos = 0
102 c.Current = c.Current[:0]
103
104 // call the Next method in a loop until at least one document is returned in the next batch or
105 // the context times out.
106 for len(c.batch) == 0 {
107 // If we don't have a next batch
108 if !c.bc.Next(ctx) {
109 // Do we have an error? If so we return false.
110 c.err = c.bc.Err()
111 if c.err != nil {
112 return false
113 }
114 // Is the cursor ID zero?
115 if c.bc.ID() == 0 {
116 return false
117 }
118 // empty batch, but cursor is still valid, so continue.
119 continue
120 }
121
122 c.batch = c.bc.Batch(c.batch[:0])
123 }
124
125 return c.advanceCurrentDocument()
126}
127
128// Decode will decode the current document into val.
129func (c *Cursor) Decode(val interface{}) error {
130 return bson.UnmarshalWithRegistry(c.registry, c.Current, val)
131}
132
133// Err returns the current error.
134func (c *Cursor) Err() error { return c.err }
135
136// Close closes this cursor.
137func (c *Cursor) Close(ctx context.Context) error { return c.bc.Close(ctx) }