blob: c622f9b1f1ac9ad2e93e41f25bdcfb742b19e0ae [file] [log] [blame]
Don Newton379ae252019-04-01 12:17:06 -04001// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package driver
8
9import (
10 "context"
11
12 "github.com/mongodb/mongo-go-driver/mongo/options"
13 "github.com/mongodb/mongo-go-driver/x/bsonx"
14
15 "github.com/mongodb/mongo-go-driver/mongo/writeconcern"
16 "github.com/mongodb/mongo-go-driver/x/mongo/driver/session"
17 "github.com/mongodb/mongo-go-driver/x/mongo/driver/topology"
18 "github.com/mongodb/mongo-go-driver/x/mongo/driver/uuid"
19 "github.com/mongodb/mongo-go-driver/x/network/command"
20 "github.com/mongodb/mongo-go-driver/x/network/description"
21 "github.com/mongodb/mongo-go-driver/x/network/result"
22)
23
24// Delete handles the full cycle dispatch and execution of a delete command against the provided
25// topology.
26func Delete(
27 ctx context.Context,
28 cmd command.Delete,
29 topo *topology.Topology,
30 selector description.ServerSelector,
31 clientID uuid.UUID,
32 pool *session.Pool,
33 retryWrite bool,
34 opts ...*options.DeleteOptions,
35) (result.Delete, error) {
36
37 ss, err := topo.SelectServer(ctx, selector)
38 if err != nil {
39 return result.Delete{}, err
40 }
41
42 // If no explicit session and deployment supports sessions, start implicit session.
43 if cmd.Session == nil && topo.SupportsSessions() && writeconcern.AckWrite(cmd.WriteConcern) {
44 cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
45 if err != nil {
46 return result.Delete{}, err
47 }
48 defer cmd.Session.EndSession()
49 }
50
51 deleteOpts := options.MergeDeleteOptions(opts...)
52 if deleteOpts.Collation != nil {
53 if ss.Description().WireVersion.Max < 5 {
54 return result.Delete{}, ErrCollation
55 }
56 cmd.Opts = append(cmd.Opts, bsonx.Elem{"collation", bsonx.Document(deleteOpts.Collation.ToDocument())})
57 }
58
59 // Execute in a single trip if retry writes not supported, or retry not enabled
60 if !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) || !retryWrite {
61 if cmd.Session != nil {
62 cmd.Session.RetryWrite = false // explicitly set to false to prevent encoding transaction number
63 }
64 return delete(ctx, cmd, ss, nil)
65 }
66
67 cmd.Session.RetryWrite = retryWrite
68 cmd.Session.IncrementTxnNumber()
69
70 res, originalErr := delete(ctx, cmd, ss, nil)
71
72 // Retry if appropriate
73 if cerr, ok := originalErr.(command.Error); ok && cerr.Retryable() ||
74 res.WriteConcernError != nil && command.IsWriteConcernErrorRetryable(res.WriteConcernError) {
75 ss, err := topo.SelectServer(ctx, selector)
76
77 // Return original error if server selection fails or new server does not support retryable writes
78 if err != nil || !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) {
79 return res, originalErr
80 }
81
82 return delete(ctx, cmd, ss, cerr)
83 }
84 return res, originalErr
85}
86
87func delete(
88 ctx context.Context,
89 cmd command.Delete,
90 ss *topology.SelectedServer,
91 oldErr error,
92) (result.Delete, error) {
93 desc := ss.Description()
94
95 conn, err := ss.Connection(ctx)
96 if err != nil {
97 if oldErr != nil {
98 return result.Delete{}, oldErr
99 }
100 return result.Delete{}, err
101 }
102
103 if !writeconcern.AckWrite(cmd.WriteConcern) {
104 go func() {
105 defer func() { _ = recover() }()
106 defer conn.Close()
107
108 _, _ = cmd.RoundTrip(ctx, desc, conn)
109 }()
110
111 return result.Delete{}, command.ErrUnacknowledgedWrite
112 }
113 defer conn.Close()
114
115 return cmd.RoundTrip(ctx, desc, conn)
116}