from six import iteritems, string_types
from six.moves import map # pylint: disable=redefined-builtin
from .base import Resource
from .droplet import Droplet
from .floating_ip import FloatingIP
from .image import Image
resource_types = {
"droplets": Droplet,
# Not supported by DO yet, but it's good to be prepared:
"images": Image,
"floating_ips": FloatingIP,
}
[docs]class Tag(Resource):
r"""
.. versionadded:: 0.2.0
A tag resource, representing a label that can be applied to other
resources.
New tags are created via the :meth:`doapi.create_tag` method and can be
retrieved with the :meth:`doapi.fetch_tag` and :meth:`doapi.fetch_all_tags`
methods.
The DigitalOcean API specifies the following fields for tag objects:
:var name: the name of the tag
:vartype name: string
:var resources: a `dict` mapping resource types (e.g., ``"droplets"``) to
sub-`dict`\ s containing fields ``"count"`` (the number of resources of
the given type with the given tag) and ``"last_tagged"`` (the resource
of the given type to which the tag was most recently applied)
"""
def __init__(self, state=None, **extra):
if isinstance(state, string_types):
state = {"name": state}
super(Tag, self).__init__(state, **extra)
self.setdefault('resources', dict())
for name, cls in iteritems(resource_types):
if isinstance(self.resources.get(name), dict):
last_tagged = self.resources[name].get("last_tagged")
if last_tagged is not None and not isinstance(last_tagged, cls):
self.resources[name]["last_tagged"] = \
cls(last_tagged, doapi_manager=self.doapi_manager)
@property
def url(self):
""" The endpoint for general operations on the individual tag """
return self._url('/v2/tags/' + self.name)
[docs] def fetch(self):
"""
Fetch & return a new `Tag` object representing the tag's current state
:rtype: Tag
:raises DOAPIError: if the API endpoint replies with an error (e.g., if
the tag no longer exists)
"""
api = self.doapi_manager
return api._tag(api.request(self.url)["tag"])
[docs] def __str__(self):
""" Convert the tag object to its name """
return self.name
[docs] def update_tag(self, name):
# The `_tag` is to avoid conflicts with MutableMapping.update.
"""
Update (i.e., rename) the tag
:param str name: the new name for the tag
:return: an updated `Tag` object
:rtype: Tag
:raises DOAPIError: if the API endpoint replies with an error
"""
api = self.doapi_manager
return api._tag(api.request(self.url, method='PUT',
data={"name": name})["tag"])
[docs] def delete(self):
"""
Delete the tag
:return: `None`
:raises DOAPIError: if the API endpoint replies with an error
"""
self.doapi_manager.request(self.url, method='DELETE')
[docs] def add(self, *resources):
"""
Apply the tag to one or more resources
:param resources: one or more `Resource` objects to which tags can be
applied
:return: `None`
:raises DOAPIError: if the API endpoint replies with an error
"""
self.doapi_manager.request(self.url + '/resources', method='POST',
data={"resources": _to_taggable(resources)})
[docs] def remove(self, *resources):
"""
Remove the tag from one or more resources
:param resources: one or more `Resource` objects to which tags can be
applied
:return: `None`
:raises DOAPIError: if the API endpoint replies with an error
"""
self.doapi_manager.request(self.url + '/resources', method='DELETE',
data={"resources": _to_taggable(resources)})
[docs] def fetch_all_droplets(self):
r"""
Returns a generator that yields all of the droplets to which the tag is
currently applied
:rtype: generator of `Droplet`\ s
:raises DOAPIError: if the API endpoint replies with an error
"""
return self.doapi_manager.fetch_all_droplets(tag_name=self.name)
[docs] def delete_all_droplets(self):
"""
Delete all of the droplets to which the tag is applied
:return: `None`
:raises DOAPIError: if the API endpoint replies with an error
"""
self.doapi_manager.request('/v2/droplets', method='DELETE',
params={"tag_name": self.name})
[docs] def act_on_droplets(self, **data):
r"""
Perform an arbitrary action on all of the droplets to which the tag is
applied. ``data`` will be serialized as JSON and POSTed to the proper
API endpoint. All currently-documented actions require the POST body
to be a JSON object containing, at a minimum, a ``"type"`` field.
:return: a generator of `Action`\ s representing the in-progress
operations on the droplets
:rtype: generator of `Action`\ s
:raises DOAPIError: if the API endpoint replies with an error
"""
api = self.doapi_manager
return map(api._action, api.request('/v2/droplets/actions', method='POST', params={"tag_name": self.name}, data=data)["actions"])
[docs] def power_cycle(self):
r"""
Power cycle all of the droplets to which the tag is applied
:return: a generator of `Action`\ s representing the in-progress
operations on the droplets
:rtype: generator of `Action`\ s
:raises DOAPIError: if the API endpoint replies with an error
"""
return self.act_on_droplets(type='power_cycle')
[docs] def power_on(self):
r"""
Power on all of the droplets to which the tag is applied
:return: a generator of `Action`\ s representing the in-progress
operations on the droplets
:rtype: generator of `Action`\ s
:raises DOAPIError: if the API endpoint replies with an error
"""
return self.act_on_droplets(type='power_on')
[docs] def power_off(self):
r"""
Power off all of the droplets to which the tag is applied
:return: a generator of `Action`\ s representing the in-progress
operations on the droplets
:rtype: generator of `Action`\ s
:raises DOAPIError: if the API endpoint replies with an error
"""
return self.act_on_droplets(type='power_off')
[docs] def shutdown(self):
r"""
Shut down all of the droplets to which the tag is applied
:return: a generator of `Action`\ s representing the in-progress
operations on the droplets
:rtype: generator of `Action`\ s
:raises DOAPIError: if the API endpoint replies with an error
"""
return self.act_on_droplets(type='shutdown')
[docs] def enable_private_networking(self):
r"""
Enable private networking on all of the droplets to which the tag is
applied
:return: a generator of `Action`\ s representing the in-progress
operations on the droplets
:rtype: generator of `Action`\ s
:raises DOAPIError: if the API endpoint replies with an error
"""
return self.act_on_droplets(type='enable_private_networking')
[docs] def enable_ipv6(self):
r"""
Enable IPv6 networking on all of the droplets to which the tag is
applied
:return: a generator of `Action`\ s representing the in-progress
operations on the droplets
:rtype: generator of `Action`\ s
:raises DOAPIError: if the API endpoint replies with an error
"""
return self.act_on_droplets(type='enable_ipv6')
[docs] def enable_backups(self):
r"""
Enable backups on all of the droplets to which the tag is applied
:return: a generator of `Action`\ s representing the in-progress
operations on the droplets
:rtype: generator of `Action`\ s
:raises DOAPIError: if the API endpoint replies with an error
"""
return self.act_on_droplets(type='enable_backups')
[docs] def disable_backups(self):
r"""
Disable backups on all of the droplets to which the tag is applied
:return: a generator of `Action`\ s representing the in-progress
operations on the droplets
:rtype: generator of `Action`\ s
:raises DOAPIError: if the API endpoint replies with an error
"""
return self.act_on_droplets(type='disable_backups')
[docs] def snapshot(self, name):
r"""
Create snapshot images of all of the droplets to which the tag is
applied
:param str name: the name for the new snapshots
:return: a generator of `Action`\ s representing the in-progress
operations on the droplets
:rtype: generator of `Action`\ s
:raises DOAPIError: if the API endpoint replies with an error
"""
return self.act_on_droplets(type='snapshot', name=name)
def _to_taggable(resources):
res = []
for r in resources:
try:
res.append(r._taggable())
except (AttributeError, TypeError):
if isinstance(r, Resource):
raise TypeError('Tagging {0!r} objects is not supported'
.format(r._class()))
else:
# Assume `r` is a "primitive" type
res.append(r)
return res