buscmd.py 13 KB
Newer Older
1 2 3 4
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import click
5
import urllib.request, urllib.parse, urllib.error
6 7 8 9 10 11 12 13 14 15 16 17 18
import json
import os
import glob
import csv
import datetime
import io
import xlwt
import sys
import yaml
import shutil
import hashlib
import zlib
from pyparsing import Word, nums, alphanums, alphas, oneOf, lineStart, lineEnd, Optional, restOfLine, Literal, ParseException, CaselessLiteral
19
from urllib.request import URLopener
20

21
from pymongo import MongoClient, DESCENDING, ASCENDING
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104

XMLTOJSON = '/usr/local/bin/xml-to-json'
#FILEPATH = '/home/_data/preservation/ftpmirrors/bus/bus.gov.ru/GeneralInfo'
LOCALPATH = '/data/ftp/bus.gov.ru/'
NSI_PATH = os.path.join(LOCALPATH, 'all')

DATA_CATEGORY_GENERAL = 'GeneralInfo'
DATA_CATEGORY_ACTIONGRANT = 'ActionGrant'
DATA_CATEGORY_ACTIVERES = 'ActivityResult'
DATA_CATEGORY_BUDGETCIRC = 'BudgetaryCircumstances'
DATA_CATEGORY_DIVERSEINFO = 'DiverseInfo'
DATA_CATEGORY_FINACTIVITY = 'FinancialActivityPlan'
DATA_CATEGORY_INSACTIVITY = 'InspectionActivity'
DATA_CATEGORY_STATETASK = 'StateTask'
DATA_CATEGORY_REGPUBINFO = 'regPublishedInfo'
DATA_CATEGORY_ABF0503121 = 'annualBalanceF0503121'
DATA_CATEGORY_ABF0503127 = 'annualBalanceF0503127'
DATA_CATEGORY_ABF0503130 = 'annualBalanceF0503130'
DATA_CATEGORY_ABF0503137 = 'annualBalanceF0503137'
DATA_CATEGORY_ABF0503721 = 'annualBalanceF0503721'
DATA_CATEGORY_ABF0503730 = 'annualBalanceF0503730'
DATA_CATEGORY_ABF0503737 = 'annualBalanceF0503737'

DATA_NSI_BUDGETS = 'nsiBudget'
DATA_NSI_KLADR = 'nsiKladr'
DATA_NSI_KBK = 'nsiKbk'
DATA_NSI_KBKBUDGET = 'nsiKbkBudget'
DATA_NSI_OKATO = 'nsiOkato'
DATA_NSI_OKTMO = 'nsiOktmo'
DATA_NSI_OKER = 'nsiOker'
DATA_NSI_OKVED = 'nsiOkved'
DATA_NSI_OKFS = 'nsiOkfs'
DATA_NSI_OKOGU = 'nsiOkogu'
DATA_NSI_OKOPF = 'nsiOkopf'
DATA_NSI_SUBJECTSERVICE = 'nsiSubjectService'
DATA_NSI_PPO = 'nsiPpo'
DATA_NSI_OGS = 'nsiOgs'
DATA_NSI_FEDERALSERVICE = 'nsiFederalService'
DATA_NSI_INSTITUTIONTYPE = 'nsiInstitutionType'
DATA_NSI_IT = 'nsiIt'

NSI_LIST = [DATA_NSI_BUDGETS, DATA_NSI_KLADR, DATA_NSI_KBK, DATA_NSI_KBKBUDGET, DATA_NSI_OKATO, 
    DATA_NSI_OKTMO, DATA_NSI_OKER, DATA_NSI_OKVED, DATA_NSI_OKFS,
    DATA_NSI_OKOGU, DATA_NSI_OKOPF, DATA_NSI_SUBJECTSERVICE, DATA_NSI_PPO, DATA_NSI_OGS, DATA_NSI_FEDERALSERVICE,
    DATA_NSI_IT, DATA_NSI_INSTITUTIONTYPE
]

CAT_MAP = {
    DATA_CATEGORY_GENERAL : 'generalinfo',
    DATA_CATEGORY_ACTIONGRANT : 'actiongrant',
    DATA_CATEGORY_ACTIVERES : 'activeres',
    DATA_CATEGORY_BUDGETCIRC : 'budgetcirc',
    DATA_CATEGORY_DIVERSEINFO : 'diverseinfo',
    DATA_CATEGORY_FINACTIVITY : 'finactplan',
    DATA_CATEGORY_INSACTIVITY : 'insactivity',
    DATA_CATEGORY_STATETASK : 'statetask',
    DATA_CATEGORY_REGPUBINFO : 'regpubinfo',
    DATA_CATEGORY_ABF0503121 : 'afb0503121',
    DATA_CATEGORY_ABF0503127 : 'afb0503127',
    DATA_CATEGORY_ABF0503130 : 'afb0503130',
    DATA_CATEGORY_ABF0503137 : 'afb0503127',
    DATA_CATEGORY_ABF0503721 : 'afb0503721',
    DATA_CATEGORY_ABF0503730 : 'afb0503730',
    DATA_CATEGORY_ABF0503737 : 'afb0503737',
}

ARCHIVE_STATUS_UNPROCESSED = 1
ARCHIVE_STATUS_PROCESSING = 2
ARCHIVE_STATUS_PROCESSED = 3


TMPPATH = '/tmp/ram'

def calc_hash_crc(filename):
    """Calculate hash and crc32 of selected file"""
    data = open(filename, 'rb').read()
    fhash = hashlib.sha256(data).hexdigest()
    fcrc = zlib.crc32(data)
    return {'sha256': fhash, 'crc32' : fcrc}

class MongoLoader:
    def __init__(self, thepath):
        self.localpath = thepath
105
        self.conn = MongoClient()
106 107 108 109 110 111 112
        self.db = self.conn['bus']
        self.archcoll = self.db['archives']
        self.filecoll = self.db['files']
        pass

    def buildIndexes(self):
        self.archcoll.ensure_index([('obj_name', DESCENDING)])
113
        for name in list(CAT_MAP.values()):
114 115 116 117 118 119 120 121 122
            self.db[name].ensure_index([('position.positionId', DESCENDING)])
        for name in NSI_LIST:
            self.db[name].ensure_index([('position.positionId', DESCENDING)])

    def stats(self):
        """Calc collections stats"""
        import tabulate
        keys = ['category', 'total_files', 'not_loaded', 'count', 'size']
        table = []
123 124
        print('Collecting stats')
        cats = list(CAT_MAP.keys())
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
        cats.sort()
        total = ['total', 0, 0, 0, 0]
        for cat in cats:
            row = [cat]
            row.append(self.archcoll.find({'category' : cat}).count())
            row.append(self.archcoll.find({'category' : cat, 'status' : ARCHIVE_STATUS_UNPROCESSED}).count())
            stats = self.db.command("collstats", CAT_MAP[cat])
            row.append(stats['count'])
            row.append(float(stats['size']) / (1024*1024))
            table.append(row)

        NSI_LIST.sort()
        for cat in NSI_LIST:
            row = [cat]
            row.append(self.archcoll.find({'category' : cat}).count())
            row.append(self.archcoll.find({'category' : cat, 'status' : ARCHIVE_STATUS_UNPROCESSED}).count())
            stats = self.db.command("collstats", cat)
            row.append(stats['count'])
            row.append(float(stats['size']) / (1024*1024))
            table.append(row)
        
        for row in table:            
            total[1] += row[1]
            total[2] += row[2]
            total[3] += row[3]
            total[4] += row[4]
        table.append(total) 
152
        print(tabulate.tabulate(table, headers=keys))
153 154 155 156 157


    def loadNSI(self):
        """Loads all NSI to DB"""
        for nsiname in NSI_LIST:
158
            print(nsiname)
159 160 161 162 163 164 165 166 167 168 169
#            if nsiname != DATA_NSI_KBKBUDGET: continue
            ffilter = NSI_PATH + '/' + nsiname + '_*.zip'
#            print ffilter
            files = glob.glob(ffilter)
            for pname in files:
                obj_name = pname.rsplit('/', 1)[1]
                obj = self.archcoll.find_one({'obj_name' : obj_name})
                if not obj:
                    obj = {'obj_name' : obj_name, 'filepath' : pname, 'category' : nsiname, 'filesize' : os.stat(pname)[6], 'status' : ARCHIVE_STATUS_UNPROCESSED}
                    obj.update(calc_hash_crc(pname))
                    self.archcoll.save(obj)
170
                    print('-', pname, ':', obj_name, 'saved')
171
                else:
172
                    print('-', pname, 'already collected')
173 174 175 176 177 178
            pass
        pass            


    def findArchives(self, category=DATA_CATEGORY_GENERAL, dbupdate=True):
        """Collect all archives by category"""
179
        print('Collecting source data from', self.localpath)
180 181 182 183 184 185 186 187 188 189 190 191 192 193
        pathname = os.path.join(self.localpath, category)
#        cat_dirs = os.listdir(self.localpath)
        for dirname, dirnames, filenames in os.walk(pathname):
            for filename in filenames:
                pname = os.path.join(dirname, filename)
                ext = pname.rsplit('.', 1)[-1].lower()
                if ext == 'zip':
                    region = pname.split('/')[-2]
                    obj_name = '/'.join(pname.split('/')[-3:])
                    obj = self.archcoll.find_one({'obj_name' : obj_name})
                    if not obj:
                        obj = {'obj_name' : obj_name, 'filepath' : pname, 'category' : category, 'region' : region, 'filesize' : os.stat(pname)[6], 'status' : ARCHIVE_STATUS_UNPROCESSED}
                        obj.update(calc_hash_crc(pname))
                        self.archcoll.save(obj)
194
                        print('-', pname, ':', obj_name, 'saved')
195
                    else:
196
                        print('-', pname, 'already collected')
197 198 199 200 201 202 203
#                        print obj

    def process_category(self, category, force=False, nocheck=False):
        """Process selected category"""
        all = []
        if not force:
            objects = self.archcoll.find({'category' : category, 'status' : {'$in' : [ARCHIVE_STATUS_UNPROCESSED, ARCHIVE_STATUS_PROCESSING]}})
204
            print('Total unprocessed archives:', objects.count())
205 206
        else:
            objects = self.archcoll.find({'category' : category})
207
            print('Total archives', objects.count())
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
        for o in objects:
            all.append(o)
        for o in all:
            self.process_archive(o, force=force, nocheck=nocheck)
        pass

    def process_archive(self, o, force=False, nocheck=False):
        if o['status'] == ARCHIVE_STATUS_UNPROCESSED:
            nocheck = True
        elif o['status'] == ARCHIVE_STATUS_PROCESSING:
            nocheck = False
        pname = o['filepath']
        name = pname.rsplit('/', 1)[-1]
        finndame = os.path.join(TMPPATH, name)
        try:
            os.mkdir(finndame)
        except:
            pass
        o['status'] = ARCHIVE_STATUS_PROCESSING
        tempname = os.path.join(finndame, name)
        shutil.copy(pname, tempname)
        cwd = os.getcwd()
        os.chdir(finndame)
        os.system('7za x -y %s 1> /dev/null 2>/dev/null' % (name))
        files = os.listdir(finndame)
        allfiles = []
        for fname in files:
            allfiles.append({'fname' : fname})
236
            self.process_file(o['category'], o['region'] if 'region' in o else None, name, os.path.abspath(fname), nocheck=nocheck, force=force)
237 238 239 240 241 242
        o['status'] = ARCHIVE_STATUS_PROCESSED
        o['files'] = allfiles
        o['nfiles'] = len(files)
        self.archcoll.save(o)
        os.chdir(cwd)
        shutil.rmtree(finndame)
243
        print(pname, 'processed')
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
        pass

    def process_file(self, category, region, archivename, filename, nocheck=False, force=True):
        collname = self.map_category(category)
        coll = self.db[collname]
        name, ext = filename.rsplit('.', 1)
        if ext == 'xml':
            if region:
                id = '/'.join([category, region, archivename, filename.rsplit('/', 1)[-1]])
            else:
                id = '/'.join([category, archivename, filename.rsplit('/', 1)[-1]])
                nocheck = True 
            if nocheck:           
                data = None 
            else:
                data = coll.find_one({'processed.id' : id})
            found = data is not None
            if data and not force:
                return

            os.system(XMLTOJSON + ' -t ns2:position %s > %s' % (filename, name + '.json'))
            f = file(name + '.json', 'r')
            objects = []
            if region:
                try:
                    fdata = json.loads(f.read())
                    objects.append(fdata)
                except ValueError:
                    return
            else:
                for r in f:
                    fdata = json.loads(r)
                    objects.append(fdata)            
            if len(objects) == 1 and not nocheck:
                if data is None: data = objects[0]
                else: data.update(objects[0])
                proc = {'id': id, 'category' : category, 'region' : region, 'xml' : filename, 'archive': archivename}
                data['processed'] = proc
                coll.save(data)
            else:
                for o in objects:
                    proc = {'id': id, 'category' : category, 'region' : region, 'xml' : filename, 'archive': archivename}
                    o['processed'] = proc
                    coll.save(o)
            if found:
289
                print('-', name, 'updated')
290
            else:
291
                print('-', name, 'saved')
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
        pass

    def map_category(self, category):
        """Returns collection name on category"""
        v = CAT_MAP.get(category, None)
        if v is None:
            if category in NSI_LIST:
                return category
            else:
                return None
        else:
            return v

    def reset(self):
        """Drops databases"""
307
        print('Reset database')
308
        self.archcoll.drop()
309 310 311
        print('archive collection dropped')
        for name in list(CAT_MAP.values()):
            print(name, 'collection dropped')
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385
            self.db[name].drop()


@click.group()
def cli1():
    pass

@cli1.command()
@click.option('--cat', default=DATA_CATEGORY_GENERAL)
def listdata(cat):
    """Lists all source files"""
    loader = MongoLoader(LOCALPATH)
    loader.buildIndexes()
    loader.findArchives(cat)
    pass

@click.group()
def cli2():
    pass

@cli2.command()
@click.option('--cat', default=DATA_CATEGORY_GENERAL)
@click.option('--force', default=False)
@click.option('--nocheck', default=False)
def process(cat, force, nocheck):
    """Process archive files"""
    force = bool(force)
    nocheck = bool(nocheck)
    loader = MongoLoader(LOCALPATH)
    #loader.findArchives(DATA_CATEGORY_GENERAL)
    loader.buildIndexes()
    loader.process_category(cat, nocheck=nocheck, force=force)
    pass


@click.group()
def cli3():
    pass

@cli3.command()
def reset():
    """Drop databases"""
    loader = MongoLoader(LOCALPATH)
    loader.reset()


@click.group()
def cli4():
    pass

@cli4.command()
def stats():
    """Statistics"""
    loader = MongoLoader(LOCALPATH)
    loader.buildIndexes()
    loader.stats()
    pass

@click.group()
def cli5():
    pass

@cli5.command()
def loadnsi():
    """Load all NSI data"""
    loader = MongoLoader(LOCALPATH)
    loader.loadNSI()
    pass


cli = click.CommandCollection(sources=[cli1, cli2, cli3, cli4, cli5])

if __name__ == '__main__':
    cli()