Source code for tfs.resources

# -*- coding: utf-8 -*-
"""
TFS API python 3 version
"""
import os
from copy import deepcopy

from requests.structures import CaseInsensitiveDict


[docs]class TFSObject(object): def __init__(self, data=None, tfs=None, uri=''): # TODO: CaseInsensitive Dict self.data = data self.tfs = tfs self.id = self.data.get('id', None) self.uri = uri self.url = self.data['url'] if 'url' in self.data else None self._data = self.data # legacy, some people can use private method def __dir__(self): """ Extend standart dir() with attribute in `_links` :return: extended list of attribute name """ original = super(TFSObject, self).__dir__() extend = self.data.get('_links', {}) extend = list(extend) new_dir = original + extend return new_dir def __get_object_by_links(self, name): """ Dynamically add property for all ``_links`` field in JSON, if exist """ links = self.data.get('_links', {}) # or emtpy if _links is not exist url = links[name]['href'] return self.tfs.get_tfs_object(url) def __getattr__(self, name): """ If object have not attribute, try search in `_links` and return new TFSObject :param name: :return: TFSObject """ if name in self.data.get('_links', {}): return self.__get_object_by_links(name) raise AttributeError("'{}' object has no attribute '{}'".format(self.__class__.__name__, name)) # TODO: implement better repr # def __repr__(self): # _repr = '' # for k, v in self.data.items(): # _repr += to_str(k) + ' = ' + to_str(v) + '\n' # return _repr def __getitem__(self, key): return self.data[key] def __setitem__(self, key, value): """ We not implement default behavior, use class for it :param key: :param value: :return: """ raise NotImplementedError
[docs] def get(self, key, default=None): return self.data.get(key, default)
[docs]class Workitem(TFSObject): def __init__(self, data=None, tfs=None, uri=''): super().__init__(data, tfs, uri) # Use prefix in automatically lookup. # We don't need use wi['System.History'], we use simple wi['History'] self._system_prefix = 'System.' self.id = self.data['id'] self.fields = CaseInsensitiveDict(self.data['fields']) self._fields = self.fields def __setitem__(self, key, value): field_path = "/fields/{}".format(key) update_data = [dict(op="add", path=field_path, value=value)] raw = self.tfs.update_workitem(self.id, update_data) self.__init__(raw, self.tfs)
[docs] def get(self, key, default=None): if key in self.fields: return self.fields[key] # try to automatically add prefix key = self._add_prefix(key) return self.fields.get(key, default)
def __getitem__(self, key): if key in self.fields: return self.fields[key] # try to automatically add prefix key = self._add_prefix(key) return self.fields[key] def _add_prefix(self, key): if key.startswith(self._system_prefix): return key else: return self._system_prefix + key def _remove_prefix(self, key): if key.startswith(self._system_prefix): return key[len(self._system_prefix):] else: return key @property def field_names(self): return [self._remove_prefix(x) for x in self.fields] @property def history(self): return self.workItemHistory @property def revisions(self): return self.workItemRevisions
[docs] def find_in_relation(self, relation_type): """ Get relation by type\\name. Auto add :param relation_type: :return: """ found = [] for relation in self.data.get('relations', []): # Find as is, e.g. 'AttachedFile' or more smartly. # Found 'Hierarchy-Forward' in 'System.LinkTypes.Hierarchy-Forward' if relation_type == relation.get('rel', '') \ or relation.get('rel', '').endswith(relation_type): found.append(relation) return found
def _find_in_relation(self, relation_type, return_one=True): """ Find relation type in relations and return one or list one use for Parent WI """ ids = [] relations = self.find_in_relation(relation_type) for relation in relations: id_ = relation['url'].split('/')[-1] id_ = int(id_) ids.append(id_) if return_one: return ids[0] if ids else None else: return ids @property def parent_id(self): return self._find_in_relation('Hierarchy-Reverse', return_one=True) @property def parent(self): if self.parent_id is None: return None return self.tfs.get_workitem(self.parent_id) @property def child_ids(self): return self._find_in_relation('Hierarchy-Forward', return_one=False) @property def childs(self): if self.child_ids: return self.tfs.get_workitems(self.child_ids) else: return [] @property def attachments(self): list_ = self.find_in_relation('AttachedFile') attachments_ = [Attachment(x, self.tfs) for x in list_] return attachments_
[docs] def add_relations_raw(self, relations_raw, params=None): """ Add attachments, related (work item) links and/or hyperlinks :param relations_raw: List of relations. Each relation is a dict with the following keys: rel, url, attributes """ # remove ID from attributes as it has to be unique copy_raw = [deepcopy(relation) for relation in relations_raw] for relation in copy_raw: if 'attributes' in relation: if 'id' in relation['attributes']: del relation['attributes']['id'] path = '/relations/-' update_data = [dict(op="add", path=path, value=relation) for relation in copy_raw] if update_data: raw = self.tfs.update_workitem(self.id, update_data, params) self.__init__(raw, self.tfs)
[docs]class Attachment(TFSObject): def __init__(self, data=None, tfs=None, uri=''): super().__init__(data, tfs, uri) self.id = self.data['url'].split('/')[-1] # Get UUID from url self.name = self.data['attributes']['name']
[docs] def download(self, path='.'): path = os.path.join(path, self.name) self.tfs.download_file(self.url, path)
[docs]class Changeset(TFSObject): def __init__(self, data=None, tfs=None, uri=''): super().__init__(data, tfs, uri) self.id = self.data['changesetId'] @property def workitems(self): wi_links = self.tfs.get_tfs_object('tfvc/changesets/{}/workItems'.format(self.id)) ids = [x.id for x in wi_links] workitems = self.tfs.get_workitems(ids) return workitems
[docs]class Projects(TFSObject): @property def team(self): return self.tfs.get_tfs_object('projects/{}/teams'.format(self.id))
[docs]class TFSQuery(TFSObject): def __init__(self, data=None, tfs=None, uri=''): super().__init__(data, tfs, uri) self.result = self.tfs.rest_client.send_get('wit/wiql/{}?api-version=2.2'.format(self.id), project=True) self.columns = tuple(i['referenceName'] for i in self.result['columns']) self.column_names = tuple(i['name'] for i in self.result['columns']) self._workitems = None @property def workitems(self): if not self._workitems: self._workitems = self.tfs.get_workitems((i['id'] for i in self.result['workItems'])) return self._workitems
[docs]class Wiql(TFSObject): """ Work Item Query Language """ def __init__(self, data=None, tfs=None, uri=''): super().__init__(data, tfs, uri) self.result = self.data @property def workitem_ids(self): ids = [x['id'] for x in self.data['workItems']] return ids @property def workitems(self): return self.tfs.get_workitems(self.workitem_ids)
class GitRepository(TFSObject): pass