obidmscolumn.c 64.2 KB
Newer Older
Celine Mercier committed
1
/****************************************************************************
2
 * OBIDMS columns functions                                                 *
Celine Mercier committed
3 4 5
 ****************************************************************************/

/**
6
 * @file obidmscolumn.c
7
 * @author Celine Mercier (celine.mercier@metabarcoding.org)
Celine Mercier committed
8
 * @date 22 May 2015
9
 * @brief Functions shared by all the OBIDMS columns.
Celine Mercier committed
10 11 12 13
 */


#include <stdlib.h>
14
#include <stdio.h>
15
#include <string.h>
16
#include <sys/types.h>
17
#include <dirent.h>
18
#include <unistd.h>
19
#include <time.h>
Eric Coissac committed
20
#include <fcntl.h>
21
#include <stdbool.h>
22
#include <math.h>
23
#include <sys/mman.h>
Celine Mercier committed
24

25
#include "obidmscolumn.h"
26
#include "obidmscolumn_idx.h"
27
#include "obidmscolumndir.h"
28 29 30
#include "obidms.h"
#include "obitypes.h"
#include "obierrno.h"
31
#include "obidebug.h"
32
#include "obilittlebigman.h"
Celine Mercier committed
33
#include "obiblob_indexer.h"
34
#include "utils.h"
35
#include "libjson/json_utils.h"
36

37

38
#define DEBUG_LEVEL 0	// TODO has to be defined somewhere else (cython compil flag?)
39

Celine Mercier committed
40

41
/**************************************************************************
Eric Coissac committed
42
 *
43
 * D E C L A R A T I O N   O F   T H E   P R I V A T E   F U N C T I O N S
Eric Coissac committed
44
 *
45
 **************************************************************************/
Eric Coissac committed
46 47 48 49 50


/**
 * @brief Internal function building the file name for a column.
 *
51
 * The function builds the file name corresponding to a column of an OBIDMS.
Eric Coissac committed
52 53 54
 *
 * @warning The returned pointer has to be freed by the caller.
 *
55 56
 * @param column_name The name of the OBIDMS column file.
 * @param version_number The version number of the OBIDMS column file.
Eric Coissac committed
57
 *
58 59
 * @returns A pointer to the column file name.
 * @retval NULL if an error occurred.
Eric Coissac committed
60 61 62 63
 *
 * @since May 2015
 * @author Eric Coissac (eric.coissac@metabarcoding.org)
 */
64
static char* build_column_file_name(const char* column_name, obiversion_t version_number);
65

Eric Coissac committed
66 67

/**
68
 * @brief Internal function building the file name for a column version file.
Eric Coissac committed
69
 *
70 71
 * The column version file indicates the latest version number for a column.
 * This function returns the name of the file storing this information.
Eric Coissac committed
72 73 74
 *
 * @warning The returned pointer has to be freed by the caller.
 *
75
 * @param column_name The name of the OBIDMS column.
Eric Coissac committed
76
 *
77 78
 * @returns A pointer to the version file name.
 * @retval NULL if an error occurred.
Eric Coissac committed
79 80 81 82
 *
 * @since May 2015
 * @author Eric Coissac (eric.coissac@metabarcoding.org)
 */
83
static char* build_version_file_name(const char* column_name);
Eric Coissac committed
84

85

Eric Coissac committed
86 87
/**
 * @brief Internal function returning a new column version number
88
 *        in the OBIDMS database.
Eric Coissac committed
89
 *
90 91 92 93
 * @param column_directory A pointer as returned by obi_create_column_directory() or obi_open_column_directory().
 * @param block Whether the call is blocking or not:
 *              	- `true` the call is blocking
 *                  - `false` the call is not blocking.
Eric Coissac committed
94
 *
95 96
 * @returns The next version number for this column.
 * @retval -1 if an error occurred.
Eric Coissac committed
97 98 99 100
 *
 * @since May 2015
 * @author Eric Coissac (eric.coissac@metabarcoding.org)
 */
101
static obiversion_t obi_get_new_version_number(OBIDMS_column_directory_p column_directory, bool block);
Eric Coissac committed
102

103

Eric Coissac committed
104 105
/**
 * @brief Internal function creating a new column version file
106
 *        in the OBIDMS database.
Eric Coissac committed
107 108 109
 *
 * The new file is initialized with the minimum version number `0`.
 *
110
 * @param column_directory A pointer as returned by obi_create_column_directory() or obi_open_column_directory().
Eric Coissac committed
111
 *
112 113
 * @returns The next usable version number for this column : `0`.
 * @retval -1 if an error occurred.
Eric Coissac committed
114 115 116 117
 *
 * @since May 2015
 * @author Eric Coissac (eric.coissac@metabarcoding.org)
 */
118
static obiversion_t create_version_file(OBIDMS_column_directory_p column_directory);
Eric Coissac committed
119

120

121 122
/**
 * @brief Internal function building the default elements names of the lines of a
123
 *        column, with ';' as separator (i.e. "0;1;2;...;n\0").
124 125 126 127 128 129 130 131 132 133 134
 *
 * @warning The returned pointer has to be freed by the caller.
 *
 * @param nb_elements_per_line The number of elements per line in the column.
 *
 * @returns A pointer on the elements names.
 * @retval NULL if an error occurred.
 *
 * @since December 2016
 * @author Celine Mercier (celine.mercier@metabarcoding.org)
 */
135
static char* build_default_elements_names(index_t nb_elements_per_line);
136 137


138 139 140 141 142 143 144 145 146
/**
 * @brief Internal function formatting the elements names of the lines of a
 *        column with '\0' as separator (e.g. "0\01\02\0...\0n\0").
 *
 * @param elements_names The character string formatted with ';' as separator (e.g. "0;1;2;...;n\0").
 *
 * @since January 2017
 * @author Celine Mercier (celine.mercier@metabarcoding.org)
 */
147
static void format_elements_names(char* elements_names);
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184


/**
 * @brief Internal function comparing two element names using their sorted index, using data stored in the column header.
 *
 * @param n1_sort_idx A pointer on the sorted index of the first name.
 * @param n2_sort_idx A pointer on the sorted index of the second name.
 * @param h A pointer on the column header.
 *
 * @returns A value < 0 if name1 < name2,
 * 			a value > 0 if name1 > name2,
 * 			and 0 if name1 == name2.
 *
 * @since January 2017
 * @author Celine Mercier (celine.mercier@metabarcoding.org)
 */
static int cmp_elements_names_with_idx(const void* n1_sort_idx, const void* n2_sort_idx, const void* h);


/**
 * @brief Internal function comparing two element names using a pointer on the first name and the sorted index of the second name,
 * 		  using data stored in the column header.
 *
 * @param name1 A pointer on the first name.
 * @param n2_sort_idx A pointer on the sorted index of the second name.
 * @param h A pointer on the column header.
 *
 * @returns A value < 0 if name1 < name2,
 * 			a value > 0 if name1 > name2,
 * 			and 0 if name1 == name2.
 *
 * @since January 2017
 * @author Celine Mercier (celine.mercier@metabarcoding.org)
 */
static int cmp_elements_names_with_name_and_idx(const void* name1, const void* n2_sort_idx, const void* h);


185 186
/**
 * @brief Internal function setting the elements names of the lines of a
187
 *        column in the header of the OBIDMS column structure.
188
 *
189
 * @param column A pointer as returned by obi_create_column().
190
 * @param elements_names The names of the elements as formatted by format_elements_names().
191
 * @param elts_names_length The length of elements_names including the two terminal '\0's.
192
 *
193 194
 * @retval 0 if the operation was successfully completed.
 * @retval -1 if an error occurred.
195 196 197 198
 *
 * @since July 2015
 * @author Celine Mercier (celine.mercier@metabarcoding.org)
 */
199 200 201 202 203 204 205 206 207 208 209 210 211
static int set_elements_names(OBIDMS_column_p column, char* elements_names, int64_t elts_names_length, index_t nb_elements_per_line);


/**
 * @brief Internal function reading the informations related to the elements names
 *        of the lines of a column in the header of the OBIDMS column structure.
 *
 * @param header A pointer on the header of the column.
 *
 * @since December 2017
 * @author Celine Mercier (celine.mercier@metabarcoding.org)
 */
static void read_elt_names_informations(OBIDMS_column_header_p header);
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238


/**
 * @brief Internal function counting the number of elements names in a character array.
 *
 * @param elements_names A pointer on the character string corresponding to the elements names,
 *                       formatted with ';' or with '\0' as separator.
 * @param elt_names_formatted Whether the separator is ';' (false), or '\0' (true, as formatted by format_elements_names()).
 *
 * @returns The number of elements names in the character array.
 *
 * @since January 2017
 * @author Celine Mercier (celine.mercier@metabarcoding.org)
 */
static index_t check_elt_names_count(const char* elements_names, bool elt_names_formatted);


/**
 * @brief Internal function computing the length of a character array containing elements names as formatted by format_elements_names().
 *
 * @param elements_names A pointer on the character string corresponding to the elements names as formatted by format_elements_names().
 *
 * @returns The length of a character array.
 *
 * @since January 2017
 * @author Celine Mercier (celine.mercier@metabarcoding.org)
 */
239
static int get_formatted_elt_names_length(const char* elements_names, index_t nb_elements);
240 241


242
/**
243 244
 * @brief Internal function computing how many lines of an OBIDMS column
 *        fit in a memory page.
245
 *
246 247
 * @param data_type The data OBIType.
 * @param nb_elements_per_line The number of elements per line.
248
 *
249
 * @returns The line count for one memory page.
250 251 252 253
 *
 * @since September 2015
 * @author Celine Mercier (celine.mercier@metabarcoding.org)
 */
254
static index_t get_line_count_per_page(OBIType_t data_type, index_t nb_elements_per_line);
255 256


257
/************************************************************************
Eric Coissac committed
258
 *
259
 * D E F I N I T I O N   O F   T H E   P R I V A T E   F U N C T I O N S
Eric Coissac committed
260
 *
261
 ************************************************************************/
Eric Coissac committed
262

263

264
static char* build_column_file_name(const char* column_name, obiversion_t version_number)
265
{
266
	char* file_name;
Celine Mercier committed
267
	int version_number_length;
Eric Coissac committed
268

269
	// Build the file name
Celine Mercier committed
270
	version_number_length = (version_number == 0 ? 1 : (int)(log10(version_number)+1));
271 272 273 274 275 276 277 278
	file_name =	(char*) malloc((strlen(column_name) + version_number_length + 6)*sizeof(char));
	if (file_name == NULL)
	{
		obi_set_errno(OBI_MALLOC_ERROR);
		obidebug(1, "\nError allocating the memory for a column file name");
		return NULL;
	}

Celine Mercier committed
279
	if (sprintf(file_name,"%s@%d.odc", column_name, version_number) < 0)
Eric Coissac committed
280
	{
281
		obi_set_errno(OBICOL_MEMORY_ERROR);
282
		obidebug(1, "\nError building a column file name");
Eric Coissac committed
283 284 285
		return NULL;
	}

286
	return file_name;
Eric Coissac committed
287 288 289
}


290
static char* build_version_file_name(const char* column_name)
291
{
292
	char* file_name;
Eric Coissac committed
293

294
	// Build the file name
Celine Mercier committed
295
	file_name =	(char*) malloc((strlen(column_name) + 5)*sizeof(char));
296 297 298 299 300 301 302
	if (file_name == NULL)
	{
		obi_set_errno(OBI_MALLOC_ERROR);
		obidebug(1, "\nError allocating the memory for a version file name");
		return NULL;
	}

Celine Mercier committed
303
	if (sprintf(file_name,"%s.odv", column_name) < 0)
Eric Coissac committed
304
	{
305
		obi_set_errno(OBICOL_MEMORY_ERROR);
306
		obidebug(1, "\nError building a version file name");
Eric Coissac committed
307 308 309
		return NULL;
	}

310
	return file_name;
Eric Coissac committed
311 312
}

313

314
static obiversion_t obi_get_new_version_number(OBIDMS_column_directory_p column_directory, bool block)
315 316 317 318 319 320
{
	off_t 			loc_size;
	obiversion_t 	new_version_number;
	char* 			version_file_name;
	int    			version_file_descriptor;
	int 			lock_mode;
Eric Coissac committed
321

322
	new_version_number = 0;
323
	loc_size = sizeof(obiversion_t);
Eric Coissac committed
324 325 326

	// Select the correct lockf operation according to the blocking mode
	if (block)
327
		lock_mode=F_LOCK;
Eric Coissac committed
328
	else
329
		lock_mode=F_TLOCK;
Eric Coissac committed
330

331
	// Build the version file name
332
	version_file_name = build_version_file_name(column_directory->column_name);
333
	if (version_file_name == NULL)
Eric Coissac committed
334 335
		return -1;

336
	// Open the version file
337
	version_file_descriptor = openat(column_directory->dir_fd, version_file_name, O_RDWR);
338 339
	if (version_file_descriptor < 0)
	{
Eric Coissac committed
340
		if (errno == ENOENT)
341
			return create_version_file(column_directory);
Eric Coissac committed
342 343
		else
		{
344
			obi_set_errno(OBICOL_UNKNOWN_ERROR);
345 346
			obidebug(1, "\nError opening a version file");
			free(version_file_name);
Eric Coissac committed
347 348 349 350
			return -1;
		}
	}

351 352
	free(version_file_name);

Eric Coissac committed
353
	// Test if the version file size is ok
354
	if (lseek(version_file_descriptor, 0, SEEK_END) < loc_size)
Eric Coissac committed
355
	{
356
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
357
		obidebug(1, "\nError testing if a version file size is ok");
358
		close(version_file_descriptor);
Eric Coissac committed
359 360 361
		return -1;
	}

362
	// Reset offset to 0
363
	if (lseek(version_file_descriptor, 0, SEEK_SET) != 0)
Eric Coissac committed
364
	{
365
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
366
		obidebug(1, "\nError positioning offset in version file");
367
		close(version_file_descriptor);
Eric Coissac committed
368 369 370 371
		return -1;
	}

	// Lock the file
372
	if (lockf(version_file_descriptor, lock_mode, loc_size) < 0)
Eric Coissac committed
373
	{
374
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
375
		obidebug(1, "\nError locking a version file");
376
		close(version_file_descriptor);
Eric Coissac committed
377 378 379
		return -1;
	}

380
    // Read the current version number
381
    if (read(version_file_descriptor, &new_version_number, sizeof(obiversion_t)) < ((ssize_t) sizeof(obiversion_t)))
Eric Coissac committed
382
	{
383
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
384
		obidebug(1, "\nError reading a version file");
385
		close(version_file_descriptor);
Eric Coissac committed
386 387 388
		return -1;
	}

389
    new_version_number++;
Eric Coissac committed
390

391 392 393 394 395 396 397 398 399
    // Reset offset to 0 to write the new version number
	if (lseek(version_file_descriptor, 0, SEEK_SET) != 0)
	{
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
		obidebug(1, "\nError positioning offset in version file");
		close(version_file_descriptor);
		return -1;
	}

400
    // Write the new version number
401
	if (write(version_file_descriptor, &new_version_number, sizeof(obiversion_t)) < ((ssize_t) sizeof(obiversion_t)))
Eric Coissac committed
402
	{
403
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
404
		obidebug(1, "\nError writing a new version number in a version file");
405
		close(version_file_descriptor);
Eric Coissac committed
406 407 408
		return -1;
	}

409
	// Reset offset to 0 (TODO: why?)
410
	if (lseek(version_file_descriptor, 0, SEEK_SET) != 0)
Eric Coissac committed
411
	{
412
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
413
		obidebug(1, "\nError positioning offset in version file");
414
		close(version_file_descriptor);
Eric Coissac committed
415 416 417
		return -1;
	}

418 419
	// Unlock the file
	if (lockf(version_file_descriptor, F_ULOCK, loc_size) < 0)
Eric Coissac committed
420
	{
421
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
422
		obidebug(1, "\nError unlocking a version file");
423
		close(version_file_descriptor);
Eric Coissac committed
424 425 426
		return -1;
	}

427 428 429 430 431 432
	if (close(version_file_descriptor) < 0)
	{
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
		obidebug(1, "\nError closing a version file");
		return -1;
	}
433

434
	return new_version_number;
Eric Coissac committed
435 436
}

437

438
static obiversion_t create_version_file(OBIDMS_column_directory_p column_directory)
439 440 441 442 443
{
	off_t 			loc_size;
	obiversion_t 	version_number;
	char* 			version_file_name;
	int    			version_file_descriptor;
Eric Coissac committed
444

445
	loc_size = sizeof(obiversion_t);
446
	version_number = 0;
Eric Coissac committed
447

448
	version_file_name = build_version_file_name(column_directory->column_name);
449
	if (version_file_name == NULL)
Eric Coissac committed
450 451
		return -1;

452
	// Get the file descriptor associated to the version file
453
	version_file_descriptor = openat(column_directory->dir_fd, version_file_name, O_RDWR | O_CREAT | O_EXCL, 0777);
454 455
	if (version_file_descriptor < 0)
	{
456
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
457
		obidebug(1, "\nError opening a version file");
458
		free(version_file_name);
Eric Coissac committed
459 460 461
		return -1;
	}

462 463
	free(version_file_name);

Eric Coissac committed
464
	// Lock the file
465
	if (lockf(version_file_descriptor, F_LOCK, loc_size) < 0)
Eric Coissac committed
466
	{
467
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
468
		obidebug(1, "\nError locking a version file");
469
		close(version_file_descriptor);
Eric Coissac committed
470 471 472
		return -1;
	}

473
	// Truncate the version file to the right size
474
	if (ftruncate(version_file_descriptor, loc_size) < 0)
Eric Coissac committed
475
	{
476
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
477
		obidebug(1, "\nError truncating a version file");
478
		close(version_file_descriptor);
Eric Coissac committed
479 480 481
		return -1;
	}

482
	// Position offset to 0 to prepare for writing		// TODO Unnecessary?
483
	if (lseek(version_file_descriptor, 0, SEEK_SET) != 0)
Eric Coissac committed
484
	{
485
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
486
		obidebug(1, "\nError changing offset of a version file");
487
		close(version_file_descriptor);
Eric Coissac committed
488 489 490
		return -1;
	}

491
	// Write version number
492
	if (write(version_file_descriptor, &version_number, sizeof(obiversion_t)) < ((ssize_t) sizeof(obiversion_t)))
Eric Coissac committed
493
	{
494
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
495
		obidebug(1, "\nError writing version number in a version file");
496
		close(version_file_descriptor);
Eric Coissac committed
497 498 499
		return -1;
	}

500
	// Prepare for unlocking
501
	if (lseek(version_file_descriptor, 0, SEEK_SET) != 0)		// TODO Unnecessary?
Eric Coissac committed
502
	{
503
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
504
		obidebug(1, "\nError preparing a version file for unlocking");
505
		close(version_file_descriptor);
Eric Coissac committed
506 507 508
		return -1;
	}

509 510
	// Unlock the file
	if (lockf(version_file_descriptor, F_ULOCK, loc_size) < 0)
Eric Coissac committed
511
	{
512
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
513
		obidebug(1, "\nError unlocking a version file");
514
		close(version_file_descriptor);
Eric Coissac committed
515 516 517
		return -1;
	}

518 519 520 521 522 523
	if (close(version_file_descriptor) < 0)
	{
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
		obidebug(1, "\nError closing a version file");
		return -1;
	}
524

525
	return version_number;
Eric Coissac committed
526 527
}

528

529

530 531 532 533
static char* build_default_elements_names(index_t nb_elements_per_line)
{
	char* elements_names;
	int   i;
534
	int   len;
535

536 537 538 539 540 541 542 543 544
//	if (nb_elements_per_line > NB_ELTS_MAX_IF_DEFAULT_NAME)
//	{
//		obi_set_errno(OBICOL_UNKNOWN_ERROR);
//		obidebug(1, "\nError: too many elements per line to use the default names (max = %d elements)", NB_ELTS_MAX_IF_DEFAULT_NAME);
//		return NULL;
//	}

	// TODO
	elements_names = (char*) malloc(nb_elements_per_line * 10 * sizeof(char));
545 546 547 548 549 550 551
	if (elements_names == NULL)
	{
		obi_set_errno(OBI_MALLOC_ERROR);
		obidebug(1, "\nError allocating memory for elements names");
		return NULL;
	}

552 553 554
	len = 0;
	for (i = 0; i < nb_elements_per_line; i++)
		len += sprintf(elements_names+len, "%d;", i);
555 556

	// Terminal character
557 558
	elements_names[len-1] = '\0';	// -1 to delete last ';'
	len--;
559 560 561 562 563

	return elements_names;
}


564

565
static void format_elements_names(char* elements_names)
566
{
567 568
	int     i;
	int64_t elts_names_length;
569

570
	elts_names_length = strlen(elements_names);
571 572

	// Replace the ';' with '\0'
573
	for (i=0; i < elts_names_length; i++)
574 575 576 577 578 579 580 581 582 583 584 585 586
	{
		if (elements_names[i] == ';')
			elements_names[i] = '\0';
	}
}



static int cmp_elements_names_with_idx(const void* n1_sort_idx, const void* n2_sort_idx, const void* h)
{
	char* name1=NULL;
	char* name2=NULL;

587 588
	index_t name1_idx;
	index_t name2_idx;
589

590 591
	index_t name1_sort_idx = *((index_t*)n1_sort_idx);
	index_t name2_sort_idx = *((index_t*)n2_sort_idx);
592 593 594 595 596 597 598 599 600 601 602 603 604 605 606
	OBIDMS_column_header_p header = (OBIDMS_column_header_p) h;

	name1_idx = (header->elements_names_idx)[name1_sort_idx];
	name1 = (header->elements_names)+name1_idx;

	name2_idx = (header->elements_names_idx)[name2_sort_idx];
	name2 = (header->elements_names)+name2_idx;

	return strcmp(name1, name2);
}



static int cmp_elements_names_with_name_and_idx(const void* name1, const void* n2_sort_idx, const void* h)
{
607 608
	char*   name2=NULL;
	index_t name2_idx;
609

610
	index_t name2_sort_idx = *((index_t*)n2_sort_idx);
611 612 613 614 615 616 617 618 619
	OBIDMS_column_header_p header = (OBIDMS_column_header_p) h;

	name2_idx = (header->elements_names_idx)[name2_sort_idx];
	name2 = (header->elements_names)+name2_idx;

	return strcmp(name1, name2);
}


620
static int set_elements_names(OBIDMS_column_p column, char* elements_names, int64_t elts_names_length, index_t nb_elements)
621
{
622 623
	OBIDMS_column_header_p header;
	index_t                i, j;
624

625 626 627 628 629 630 631 632
	header = column->header;

	// Store the length of the character array containing the elements names
	header->elements_names_length = elts_names_length;
	// Store the pointers pointing to the different elements stored in the memory arena
	header->elements_names = (char*)&(header->mem_arena)[0];
	header->elements_names_idx = (index_t*)&((char*)(header->mem_arena) + elts_names_length)[0];
	header->sorted_elements_idx = (header->elements_names_idx) + nb_elements;
633

634
	// Copy the elements names in the header
635
	memcpy(header->elements_names, elements_names, (elts_names_length-2)*sizeof(char));
636 637

	// Terminal characters
638 639
	header->elements_names[elts_names_length - 2] = '\0';
	header->elements_names[elts_names_length - 1] = '\0';
640 641 642 643 644

	// Build the elements names index
	i = 0;
	j = 0;
	// Index the first element name
645 646
	(header->elements_names_idx)[j] = i;
	(header->sorted_elements_idx)[j] = j;
647 648
	i++;
	j++;
649 650

	while (i < elts_names_length-2)
651 652 653
	{
		if (elements_names[i] == '\0')
		{	// Index new element name
654 655
			(header->elements_names_idx)[j] = i+1;
			(header->sorted_elements_idx)[j] = j;
656 657 658 659 660 661
			j++;
		}
		i++;
	}

	// Build the sorted index
662
	qsort_user_data(header->sorted_elements_idx, j, sizeof(index_t), column->header, cmp_elements_names_with_idx);
663

664 665 666
	return 0;
}

667

668

669 670 671 672 673 674 675 676 677 678 679 680
static void read_elt_names_informations(OBIDMS_column_header_p header)
{
	int64_t elts_names_length;

	elts_names_length = header->elements_names_length;
	header->elements_names = (char*)&(header->mem_arena)[0];
	header->elements_names_idx = (index_t*)&((char*)(header->mem_arena) + elts_names_length)[0];
	header->sorted_elements_idx = (index_t*)&((header->elements_names_idx) + (header->nb_elements_per_line))[0];
}



681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707
static index_t check_elt_names_count(const char* elements_names, bool elt_names_formatted)
{
	char    sep;
	int     i = 0;
	bool    stop = false;
	index_t count = 0;

	if (elt_names_formatted)
		sep = FORMATTED_ELT_NAMES_SEPARATOR;
	else
		sep = NOT_FORMATTED_ELT_NAMES_SEPARATOR;

	while (! stop)
	{
		if ((elt_names_formatted && (elements_names[i] == '\0') && (elements_names[i+1] == '\0')) ||
				((! elt_names_formatted) && (elements_names[i] == '\0')))
			stop = true;
		if ((elements_names[i] == sep) || (elements_names[i] == '\0'))
			count++;
		i++;
	}

	return count;
}



708
static int get_formatted_elt_names_length(const char* elements_names, index_t nb_elements)
709 710
{
	int     i = 0;
711
	index_t n = 0;
712

713
	while (n < nb_elements)
714
	{
715 716 717
		if (elements_names[i] == '\0')
			n++;
		i++;
718 719
	}

720
	return i+1;
721 722 723 724 725
}



static index_t get_line_count_per_page(OBIType_t data_type, index_t nb_elements_per_line)
726
{
727
	return getpagesize() / obi_sizeof(data_type) / nb_elements_per_line;
728 729
}

730

731
/**********************************************************************
Eric Coissac committed
732
 *
733
 * D E F I N I T I O N   O F   T H E   P U B L I C   F U N C T I O N S
Eric Coissac committed
734
 *
735 736
 **********************************************************************/

737

738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802
char* obi_version_file_full_path(OBIDMS_p dms, const char* column_name)
{
	char* version_file_name;
	char* column_dir_name;
	char* relative_path;
	char* full_path;

	version_file_name = build_version_file_name(column_name);
	if (version_file_name == NULL)
		return NULL;

	column_dir_name = obi_build_column_directory_name(column_name);
	if (column_dir_name == NULL)
		return NULL;

	relative_path = (char*) malloc(strlen(version_file_name) + strlen(column_dir_name) + 2);

	strcpy(relative_path, column_dir_name);
	strcat(relative_path, "/");
	strcat(relative_path, version_file_name);

	// Build path relative to DMS
	full_path = obi_dms_get_full_path(dms, relative_path);

	free(version_file_name);
	free(column_dir_name);
	free(relative_path);

	return full_path;
}


char* obi_column_full_path(OBIDMS_p dms, const char* column_name, obiversion_t version_number)
{
	char* column_file_name;
	char* column_dir_name;
	char* relative_path;
	char* full_path;


	column_file_name = build_column_file_name(column_name, version_number);
	if (column_file_name == NULL)
		return NULL;

	column_dir_name = obi_build_column_directory_name(column_name);
	if (column_dir_name == NULL)
		return NULL;

	relative_path = (char*) malloc(strlen(column_file_name) + strlen(column_dir_name) + 2);

	strcpy(relative_path, column_dir_name);
	strcat(relative_path, "/");
	strcat(relative_path, column_file_name);

	// Build path relative to DMS
	full_path = obi_dms_get_full_path(dms, relative_path);

	free(column_file_name);
	free(column_dir_name);
	free(relative_path);

	return full_path;
}


803
obiversion_t obi_get_latest_version_number(OBIDMS_column_directory_p column_directory)
804 805 806 807 808 809
{
	off_t 			loc_size;
	obiversion_t 	latest_version_number;
	char * 			version_file_name;
	int    			version_file_descriptor;

810
	loc_size = sizeof(obiversion_t);
811 812
	latest_version_number = 0;

813
	version_file_name = build_version_file_name(column_directory->column_name);
814
	if (version_file_name==NULL)
Eric Coissac committed
815 816
		return -1;

817
	// Get the file descriptor associated to the version file
818
	version_file_descriptor = openat(column_directory->dir_fd, version_file_name, O_RDONLY);
819 820 821
	if (version_file_descriptor < 0)
	{
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
822
		obidebug(1, "\nError opening a version file");
823
		free(version_file_name);
Eric Coissac committed
824 825 826
		return -1;
	}

827 828
	free(version_file_name);

829
	// Check that the version file size is ok
830
	if (lseek(version_file_descriptor, 0, SEEK_END) < loc_size)
Eric Coissac committed
831
	{
832
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
833
		obidebug(1, "\nError testing if a version file size is ok");
834
		close(version_file_descriptor);
Eric Coissac committed
835 836 837
		return -1;
	}

838
	// Set the offset to 0 in the version file
839
	if (lseek(version_file_descriptor, 0, SEEK_SET) != 0)
Eric Coissac committed
840
	{
841
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
842
		obidebug(1, "\nError setting the offset of a version file to 0");
843
		close(version_file_descriptor);
Eric Coissac committed
844 845 846
		return -1;
	}

847
    // Read the latest version number
848
    if (read(version_file_descriptor, &latest_version_number, sizeof(obiversion_t)) < ((ssize_t) sizeof(obiversion_t)))
Eric Coissac committed
849
	{
850
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
851
		obidebug(1, "\nError reading the latest version number in a version file");
852
		close(version_file_descriptor);
Eric Coissac committed
853 854 855
		return -1;
	}

856 857 858 859 860 861
	if (close(version_file_descriptor) < 0)
	{
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
		obidebug(1, "\nError closing a version file");
		return -1;
	}
862

863
	return latest_version_number;
Eric Coissac committed
864 865
}

866

867

868 869 870 871 872 873 874 875 876
obiversion_t obi_column_get_latest_version_from_name(OBIDMS_p dms, const char* column_name)
{
	OBIDMS_column_directory_p	column_directory;
	obiversion_t	   			latest_version;

	// Get the column directory structure associated to the column
	column_directory = obi_open_column_directory(dms, column_name);
	if (column_directory == NULL)
	{
877
		obidebug(1, "\nProblem opening a column directory structure");
878 879 880 881 882 883 884
		return -1;
	}

	// Get the latest version number
	latest_version = obi_get_latest_version_number(column_directory);
	if (latest_version < 0)
	{
885
		obidebug(1, "\nProblem getting the latest version number in a column directory");
886 887 888 889 890 891 892
		return -1;
	}

	return latest_version;
}


893 894
// TODO make private
size_t obi_calculate_header_size(index_t nb_elements_per_line, int64_t elts_names_length)
Eric Coissac committed
895
{
896 897 898 899 900
	size_t header_size;
	size_t rounded_header_size;
	double multiple;

	header_size = sizeof(OBIDMS_column_header_t);
901
	header_size = header_size + (nb_elements_per_line*2)*sizeof(int64_t) + elts_names_length*sizeof(char);
902 903 904 905 906 907

	multiple = 	ceil((double) header_size / (double) getpagesize());

	rounded_header_size = multiple * getpagesize();

	return rounded_header_size;
Eric Coissac committed
908 909 910
}


911 912 913 914 915
OBIDMS_column_p obi_create_column(OBIDMS_p     dms,
		                          const char*  column_name,
								  OBIType_t    data_type,
								  index_t      nb_lines,
								  index_t      nb_elements_per_line,
916
								  char*        elements_names,
917
								  bool		   elt_names_formatted,
918
								  bool		   tuples,
919
								  bool         to_eval,
920 921 922
								  const char*  indexer_name,
								  const char*  associated_column_name,
								  obiversion_t associated_column_version,
923
								  const char*  comments
Celine Mercier committed
924
								 )
Eric Coissac committed
925
{
926
	OBIDMS_column_p 			new_column;
927
	OBIDMS_column_directory_p	column_directory;
928 929 930 931 932 933 934
	OBIDMS_column_header_p 		header;
	size_t 						file_size;
	obiversion_t 				version_number;
	char* 						column_file_name;
	int 						column_file_descriptor;
	size_t 						header_size;
	size_t 						data_size;
935
	int 						comments_ok;
936
	index_t						minimum_line_count;
937 938
	OBIType_t  					returned_data_type;
	OBIType_t  					stored_data_type;
939
	char*			    		final_indexer_name;
940
	char*						built_elements_names = NULL;
941
	int64_t						elts_names_length;
942 943 944

	new_column = NULL;

945 946 947
	// Check that the informations given are not NULL/invalid/greater than the allowed sizes
	if (dms == NULL)
	{
948
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
949 950 951 952 953
		obidebug(1, "\nCan't create column because of invalid DMS");
		return NULL;
	}
	if (column_name == NULL)
	{
954
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
955 956 957
		obidebug(1, "\nCan't create column because of empty column name");
		return NULL;
	}
958 959 960 961 962 963 964
	if (nb_elements_per_line < 1)
	{
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
		obidebug(1, "\nCan't create column: the number of elements per line can't be less than 1");
		return NULL;
	}
	if ((data_type < 1) || (data_type > 8))		// TODO check in more robust way ?
965
	{
966
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
967 968 969
		obidebug(1, "\nCan't create column because of invalid data type");
		return NULL;
	}
970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987

	// Get the column directory structure associated to the column
	column_directory = obi_column_directory(dms, column_name);
	if (column_directory == NULL)
	{
		obi_set_errno(OBICOLDIR_UNKNOWN_ERROR);
		obidebug(1, "\nError opening a column directory structure");
		return NULL;
	}

	// Get the latest version number
	version_number = obi_get_new_version_number(column_directory, true);
	if (version_number < 0)
	{
		return NULL;
	}

	// Build the indexer name if needed
988
	if ((data_type == OBI_STR) || (data_type == OBI_SEQ) || (data_type == OBI_QUAL) || tuples)
989
	{
Eric Coissac committed
990
		if ((indexer_name == NULL) || (*indexer_name == 0))
991 992 993 994 995 996 997 998 999 1000
		{
			final_indexer_name = obi_build_indexer_name(column_name, version_number);
			if (final_indexer_name == NULL)
				return NULL;
		}
		else
		{
			final_indexer_name = (char*) malloc((strlen(indexer_name)+1)*sizeof(char));
			strcpy(final_indexer_name, indexer_name);
		}
1001
	}
1002

1003
	returned_data_type = data_type;
1004
	if ((data_type == OBI_STR) || (data_type == OBI_SEQ) || (data_type == OBI_QUAL) || tuples)
1005 1006 1007 1008 1009
	// stored data is indices referring to data stored elsewhere
		stored_data_type = OBI_IDX;
	else
		stored_data_type = returned_data_type;

1010
	// The initial line count should be between the minimum (corresponding to the page size) and the maximum allowed
Celine Mercier committed
1011
	minimum_line_count = get_line_count_per_page(stored_data_type, nb_elements_per_line);
1012 1013
	if (minimum_line_count == 0)	// Happens if high number of elements per line
		minimum_line_count = 1;
1014 1015
	if (nb_lines > MAXIMUM_LINE_COUNT)
	{
1016
		obidebug(1, "\nCan't create column because of line count greater than the maximum allowed (%d)", MAXIMUM_LINE_COUNT);
1017 1018 1019 1020 1021
		return NULL;
	}
	else if (nb_lines < minimum_line_count)
		nb_lines = minimum_line_count;

1022
	// Check, format, and build if needed the element names
1023
	if ((elements_names == NULL) || (*elements_names == '\0'))	// Build the default element names
1024
	{
1025 1026
		built_elements_names = build_default_elements_names(nb_elements_per_line);
		if (built_elements_names == NULL)
1027
			return NULL;
1028
		elements_names = built_elements_names;
1029
	}
1030
	else
1031
	{ // The number of elements names should be equal to the number of elements per line
1032
		if (check_elt_names_count(elements_names, elt_names_formatted) != nb_elements_per_line)
1033
		{
1034 1035
			obidebug(1, "\nCan't create column because the number of elements names given is not equal to the number of elements per line:"
					"\n%lld elements per line\nelements names:%s\n", nb_elements_per_line, elements_names);
1036 1037 1038
			return NULL;
		}
	}
1039 1040 1041

	// Format the elements names string
	if (! elt_names_formatted)
1042 1043
		format_elements_names(elements_names);
	elts_names_length = get_formatted_elt_names_length(elements_names, nb_elements_per_line);
1044

1045
	// Calculate the size needed
1046
	header_size = obi_calculate_header_size(nb_elements_per_line, elts_names_length);
Celine Mercier committed
1047
	data_size = obi_array_sizeof(stored_data_type, nb_lines, nb_elements_per_line);
1048
	file_size = header_size + data_size;
Eric Coissac committed
1049

1050
	// Get the column file name
1051
	column_file_name = build_column_file_name(column_name, version_number);
1052 1053
	if (column_file_name == NULL)
		return NULL;
1054

1055
	// Open the column file
1056
	column_file_descriptor = openat(column_directory->dir_fd, column_file_name, O_RDWR | O_CREAT | O_EXCL, 0777);
1057 1058 1059
	if (column_file_descriptor < 0)
	{
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
1060
		obidebug(1, "\nError opening a column file %s", column_file_name);
1061 1062 1063
		free(column_file_name);
		return NULL;
	}
Eric Coissac committed
1064

1065 1066
	free(column_file_name);

1067
	// Truncate the column file to the right size
1068
	if (ftruncate(column_file_descriptor, file_size) < 0)
Eric Coissac committed
1069
	{
1070
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
1071
		obidebug(1, "\nError truncating a column file to the right size");
1072
		close(column_file_descriptor);
Eric Coissac committed
1073 1074 1075
		return NULL;
	}

1076
	// Allocate the memory for the column structure
1077
	new_column = (OBIDMS_column_p) malloc(sizeof(OBIDMS_column_t));
1078
	if (new_column == NULL)
Eric Coissac committed
1079
	{
1080
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
1081
		obidebug(1, "\nError allocating the memory for the column structure");
1082
		close(column_file_descriptor);
Eric Coissac committed
1083 1084 1085
		return NULL;
	}

1086 1087
	// Fill the column structure
	new_column->dms    			 = dms;
1088
	new_column->column_directory = column_directory;
1089 1090 1091 1092 1093 1094 1095
	new_column->header 			 = mmap(NULL,
			                  	  	    header_size,
			                  	  	    PROT_READ | PROT_WRITE,
			                  	  	    MAP_SHARED,
			                  	  	    column_file_descriptor,
			                  	  	    0
			                 	 	   );
Eric Coissac committed
1096

1097
	if (new_column->header == MAP_FAILED)
Eric Coissac committed
1098
	{
1099
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
1100
		obidebug(1, "\nError mmapping the header of a column");
1101 1102
		close(column_file_descriptor);
		free(new_column);
Eric Coissac committed
1103 1104 1105
		return NULL;
	}

1106 1107 1108 1109 1110 1111 1112
	new_column->data   			 = mmap(NULL,
			                 	 	 	data_size,
			                 	 	 	PROT_READ | PROT_WRITE,
			                 	 	 	MAP_SHARED,
			                 	 	 	column_file_descriptor,
			                 	 	 	header_size
			                			);
Eric Coissac committed
1113

1114
	if (new_column->data == MAP_FAILED)
Eric Coissac committed
1115
	{
1116
		obi_set_errno(OBICOL_UNKNOWN_ERROR);
1117 1118
		obidebug(1, "\nError mmapping the data of a column.\nArguments: data_size=%lu, column_file_descriptor=%d, header_size=%lu",
				data_size, column_file_descriptor, header_size);
1119
		munmap(new_column->header, header_size);
1120 1121
		close(column_file_descriptor);
		free(new_column);
Eric Coissac committed
1122 1123 1124
		return NULL;
	}

Celine Mercier committed
1125
	new_column->writable = true;
Eric Coissac committed
1126

1127 1128 1129 1130 1131
	header 				  		 		  = new_column->header;
	header->header_size   		   		  = header_size;
	header->data_size			 		  = data_size;
	header->line_count   	 	 		  = nb_lines;
	header->lines_used    		          = 0;
Celine Mercier committed
1132
	header->nb_elements_per_line          = nb_elements_per_line;
1133 1134
	header->stored_data_type     		  = stored_data_type;
	header->returned_data_type   		  = returned_data_type;
1135
	header->tuples     					  = tuples;
1136
	header->to_eval                       = to_eval;
1137 1138 1139
	header->creation_date 		 		  = time(NULL);
	header->version       		          = version_number;
	header->cloned_from    		          = -1;
1140
	header->finished 					  = false;
1141

1142 1143
	set_elements_names(new_column, elements_names, elts_names_length, nb_elements_per_line);
	read_elt_names_informations(header);
1144